View Javadoc
1   /*
2    * Copyright 2015 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.util.concurrent;
17  
18  import io.netty.util.internal.DefaultPriorityQueue;
19  import io.netty.util.internal.ObjectUtil;
20  import io.netty.util.internal.PriorityQueue;
21  
22  import java.util.Comparator;
23  import java.util.Queue;
24  import java.util.concurrent.Callable;
25  import java.util.concurrent.Executors;
26  import java.util.concurrent.TimeUnit;
27  
28  /**
29   * Abstract base class for {@link EventExecutor}s that want to support scheduling.
30   */
31  public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor {
32  
33      private static final Comparator<ScheduledFutureTask<?>> SCHEDULED_FUTURE_TASK_COMPARATOR =
34              new Comparator<ScheduledFutureTask<?>>() {
35                  @Override
36                  public int compare(ScheduledFutureTask<?> o1, ScheduledFutureTask<?> o2) {
37                      return o1.compareTo(o2);
38                  }
39              };
40  
41      PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue;
42  
43      protected AbstractScheduledEventExecutor() {
44      }
45  
46      protected AbstractScheduledEventExecutor(EventExecutorGroup parent) {
47          super(parent);
48      }
49  
50      protected static long nanoTime() {
51          return ScheduledFutureTask.nanoTime();
52      }
53  
54      PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue() {
55          if (scheduledTaskQueue == null) {
56              scheduledTaskQueue = new DefaultPriorityQueue<ScheduledFutureTask<?>>(
57                      SCHEDULED_FUTURE_TASK_COMPARATOR,
58                      // Use same initial capacity as java.util.PriorityQueue
59                      11);
60          }
61          return scheduledTaskQueue;
62      }
63  
64      private static boolean isNullOrEmpty(Queue<ScheduledFutureTask<?>> queue) {
65          return queue == null || queue.isEmpty();
66      }
67  
68      /**
69       * Cancel all scheduled tasks.
70       *
71       * This method MUST be called only when {@link #inEventLoop()} is {@code true}.
72       */
73      protected void cancelScheduledTasks() {
74          assert inEventLoop();
75          PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
76          if (isNullOrEmpty(scheduledTaskQueue)) {
77              return;
78          }
79  
80          final ScheduledFutureTask<?>[] scheduledTasks =
81                  scheduledTaskQueue.toArray(new ScheduledFutureTask<?>[0]);
82  
83          for (ScheduledFutureTask<?> task: scheduledTasks) {
84              task.cancelWithoutRemove(false);
85          }
86  
87          scheduledTaskQueue.clearIgnoringIndexes();
88      }
89  
90      /**
91       * @see #pollScheduledTask(long)
92       */
93      protected final Runnable pollScheduledTask() {
94          return pollScheduledTask(nanoTime());
95      }
96  
97      /**
98       * Return the {@link Runnable} which is ready to be executed with the given {@code nanoTime}.
99       * You should use {@link #nanoTime()} to retrieve the correct {@code nanoTime}.
100      */
101     protected final Runnable pollScheduledTask(long nanoTime) {
102         assert inEventLoop();
103 
104         Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
105         ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();
106         if (scheduledTask == null) {
107             return null;
108         }
109 
110         if (scheduledTask.deadlineNanos() <= nanoTime) {
111             scheduledTaskQueue.remove();
112             return scheduledTask;
113         }
114         return null;
115     }
116 
117     /**
118      * Return the nanoseconds when the next scheduled task is ready to be run or {@code -1} if no task is scheduled.
119      */
120     protected final long nextScheduledTaskNano() {
121         Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
122         ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();
123         if (scheduledTask == null) {
124             return -1;
125         }
126         return Math.max(0, scheduledTask.deadlineNanos() - nanoTime());
127     }
128 
129     final ScheduledFutureTask<?> peekScheduledTask() {
130         Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
131         if (scheduledTaskQueue == null) {
132             return null;
133         }
134         return scheduledTaskQueue.peek();
135     }
136 
137     /**
138      * Returns {@code true} if a scheduled task is ready for processing.
139      */
140     protected final boolean hasScheduledTasks() {
141         Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
142         ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();
143         return scheduledTask != null && scheduledTask.deadlineNanos() <= nanoTime();
144     }
145 
146     @Override
147     public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
148         ObjectUtil.checkNotNull(command, "command");
149         ObjectUtil.checkNotNull(unit, "unit");
150         if (delay < 0) {
151             delay = 0;
152         }
153         validateScheduled0(delay, unit);
154 
155         return schedule(new ScheduledFutureTask<Void>(
156                 this, command, null, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay))));
157     }
158 
159     @Override
160     public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
161         ObjectUtil.checkNotNull(callable, "callable");
162         ObjectUtil.checkNotNull(unit, "unit");
163         if (delay < 0) {
164             delay = 0;
165         }
166         validateScheduled0(delay, unit);
167 
168         return schedule(new ScheduledFutureTask<V>(
169                 this, callable, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay))));
170     }
171 
172     @Override
173     public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
174         ObjectUtil.checkNotNull(command, "command");
175         ObjectUtil.checkNotNull(unit, "unit");
176         if (initialDelay < 0) {
177             throw new IllegalArgumentException(
178                     String.format("initialDelay: %d (expected: >= 0)", initialDelay));
179         }
180         if (period <= 0) {
181             throw new IllegalArgumentException(
182                     String.format("period: %d (expected: > 0)", period));
183         }
184         validateScheduled0(initialDelay, unit);
185         validateScheduled0(period, unit);
186 
187         return schedule(new ScheduledFutureTask<Void>(
188                 this, Executors.<Void>callable(command, null),
189                 ScheduledFutureTask.deadlineNanos(unit.toNanos(initialDelay)), unit.toNanos(period)));
190     }
191 
192     @Override
193     public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
194         ObjectUtil.checkNotNull(command, "command");
195         ObjectUtil.checkNotNull(unit, "unit");
196         if (initialDelay < 0) {
197             throw new IllegalArgumentException(
198                     String.format("initialDelay: %d (expected: >= 0)", initialDelay));
199         }
200         if (delay <= 0) {
201             throw new IllegalArgumentException(
202                     String.format("delay: %d (expected: > 0)", delay));
203         }
204 
205         validateScheduled0(initialDelay, unit);
206         validateScheduled0(delay, unit);
207 
208         return schedule(new ScheduledFutureTask<Void>(
209                 this, Executors.<Void>callable(command, null),
210                 ScheduledFutureTask.deadlineNanos(unit.toNanos(initialDelay)), -unit.toNanos(delay)));
211     }
212 
213     @SuppressWarnings("deprecation")
214     private void validateScheduled0(long amount, TimeUnit unit) {
215         validateScheduled(amount, unit);
216     }
217 
218     /**
219      * Sub-classes may override this to restrict the maximal amount of time someone can use to schedule a task.
220      *
221      * @deprecated will be removed in the future.
222      */
223     @Deprecated
224     protected void validateScheduled(long amount, TimeUnit unit) {
225         // NOOP
226     }
227 
228     <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
229         if (inEventLoop()) {
230             scheduledTaskQueue().add(task);
231         } else {
232             execute(new Runnable() {
233                 @Override
234                 public void run() {
235                     scheduledTaskQueue().add(task);
236                 }
237             });
238         }
239 
240         return task;
241     }
242 
243     final void removeScheduled(final ScheduledFutureTask<?> task) {
244         if (inEventLoop()) {
245             scheduledTaskQueue().removeTyped(task);
246         } else {
247             execute(new Runnable() {
248                 @Override
249                 public void run() {
250                     removeScheduled(task);
251                 }
252             });
253         }
254     }
255 }