View Javadoc
1   /*
2    * Copyright 2015 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.util.concurrent;
17  
18  import io.netty.util.internal.CallableEventExecutorAdapter;
19  import io.netty.util.internal.ObjectUtil;
20  import io.netty.util.internal.RunnableEventExecutorAdapter;
21  
22  import java.util.Iterator;
23  import java.util.PriorityQueue;
24  import java.util.Queue;
25  import java.util.concurrent.Callable;
26  import java.util.concurrent.Executors;
27  import java.util.concurrent.TimeUnit;
28  
29  /**
30   * Abstract base class for {@link EventExecutor}s that want to support scheduling.
31   */
32  public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor {
33  
34      Queue<ScheduledFutureTask<?>> scheduledTaskQueue;
35  
36      protected AbstractScheduledEventExecutor() {
37      }
38  
39      protected AbstractScheduledEventExecutor(EventExecutorGroup parent) {
40          super(parent);
41      }
42  
43      protected static long nanoTime() {
44          return ScheduledFutureTask.nanoTime();
45      }
46  
47      Queue<ScheduledFutureTask<?>> scheduledTaskQueue() {
48          if (scheduledTaskQueue == null) {
49              scheduledTaskQueue = new PriorityQueue<ScheduledFutureTask<?>>();
50          }
51          return scheduledTaskQueue;
52      }
53  
54      private static  boolean isNullOrEmpty(Queue<ScheduledFutureTask<?>> queue) {
55          return queue == null || queue.isEmpty();
56      }
57  
58      /**
59       * Cancel all scheduled tasks.
60       *
61       * This method MUST be called only when {@link #inEventLoop()} is {@code true}.
62       */
63      protected void cancelScheduledTasks() {
64          assert inEventLoop();
65          Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
66          if (isNullOrEmpty(scheduledTaskQueue)) {
67              return;
68          }
69  
70          final ScheduledFutureTask<?>[] scheduledTasks =
71                  scheduledTaskQueue.toArray(new ScheduledFutureTask<?>[scheduledTaskQueue.size()]);
72  
73          for (ScheduledFutureTask<?> task: scheduledTasks) {
74              task.cancel(false);
75          }
76  
77          scheduledTaskQueue.clear();
78      }
79  
80      /**
81       * @see {@link #pollScheduledTask(long)}
82       */
83      protected final Runnable pollScheduledTask() {
84          return pollScheduledTask(nanoTime());
85      }
86  
87      /**
88       * Return the {@link Runnable} which is ready to be executed with the given {@code nanoTime}.
89       * You should use {@link #nanoTime()} to retrieve the the correct {@code nanoTime}.
90       */
91      protected final Runnable pollScheduledTask(long nanoTime) {
92          assert inEventLoop();
93  
94          Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
95          ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();
96          if (scheduledTask == null) {
97              return null;
98          }
99  
100         if (scheduledTask.deadlineNanos() <= nanoTime) {
101             scheduledTaskQueue.remove();
102             return scheduledTask;
103         }
104         return null;
105     }
106 
107     /**
108      * Return the nanoseconds when the next scheduled task is ready to be run or {@code -1} if no task is scheduled.
109      */
110     protected final long nextScheduledTaskNano() {
111         Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
112         ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();
113         if (scheduledTask == null) {
114             return -1;
115         }
116         return Math.max(0, scheduledTask.deadlineNanos() - nanoTime());
117     }
118 
119     final ScheduledFutureTask<?> peekScheduledTask() {
120         Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
121         if (scheduledTaskQueue == null) {
122             return null;
123         }
124         return scheduledTaskQueue.peek();
125     }
126 
127     /**
128      * Returns {@code true} if a scheduled task is ready for processing.
129      */
130     protected final boolean hasScheduledTasks() {
131         Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
132         ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();
133         return scheduledTask != null && scheduledTask.deadlineNanos() <= nanoTime();
134     }
135 
136     @Override
137     public  ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
138         ObjectUtil.checkNotNull(command, "command");
139         ObjectUtil.checkNotNull(unit, "unit");
140         if (delay < 0) {
141             throw new IllegalArgumentException(
142                     String.format("delay: %d (expected: >= 0)", delay));
143         }
144         return schedule(new ScheduledFutureTask<Void>(
145                 this, toCallable(command), ScheduledFutureTask.deadlineNanos(unit.toNanos(delay))));
146     }
147 
148     @Override
149     public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
150         ObjectUtil.checkNotNull(callable, "callable");
151         ObjectUtil.checkNotNull(unit, "unit");
152         if (delay < 0) {
153             throw new IllegalArgumentException(
154                     String.format("delay: %d (expected: >= 0)", delay));
155         }
156         return schedule(new ScheduledFutureTask<V>(
157                 this, callable, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay))));
158     }
159 
160     @Override
161     public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
162         ObjectUtil.checkNotNull(command, "command");
163         ObjectUtil.checkNotNull(unit, "unit");
164         if (initialDelay < 0) {
165             throw new IllegalArgumentException(
166                     String.format("initialDelay: %d (expected: >= 0)", initialDelay));
167         }
168         if (period <= 0) {
169             throw new IllegalArgumentException(
170                     String.format("period: %d (expected: > 0)", period));
171         }
172 
173         return schedule(new ScheduledFutureTask<Void>(
174                 this, toCallable(command),
175                 ScheduledFutureTask.deadlineNanos(unit.toNanos(initialDelay)), unit.toNanos(period)));
176     }
177 
178     @Override
179     public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
180         ObjectUtil.checkNotNull(command, "command");
181         ObjectUtil.checkNotNull(unit, "unit");
182         if (initialDelay < 0) {
183             throw new IllegalArgumentException(
184                     String.format("initialDelay: %d (expected: >= 0)", initialDelay));
185         }
186         if (delay <= 0) {
187             throw new IllegalArgumentException(
188                     String.format("delay: %d (expected: > 0)", delay));
189         }
190 
191         return schedule(new ScheduledFutureTask<Void>(
192                 this, toCallable(command),
193                 ScheduledFutureTask.deadlineNanos(unit.toNanos(initialDelay)), -unit.toNanos(delay)));
194     }
195 
196     <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
197         if (inEventLoop()) {
198             scheduledTaskQueue().add(task);
199         } else {
200             execute(new Runnable() {
201                 @Override
202                 public void run() {
203                     scheduledTaskQueue().add(task);
204                 }
205             });
206         }
207 
208         return task;
209     }
210 
211     void purgeCancelledScheduledTasks() {
212         Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
213         if (isNullOrEmpty(scheduledTaskQueue)) {
214             return;
215         }
216         Iterator<ScheduledFutureTask<?>> i = scheduledTaskQueue.iterator();
217         while (i.hasNext()) {
218             ScheduledFutureTask<?> task = i.next();
219             if (task.isCancelled()) {
220                 i.remove();
221             }
222         }
223     }
224 
225     private static Callable<Void> toCallable(final Runnable command) {
226         if (command instanceof RunnableEventExecutorAdapter) {
227             return new RunnableToCallableAdapter((RunnableEventExecutorAdapter) command);
228         } else {
229             return Executors.callable(command, null);
230         }
231     }
232 
233     private static class RunnableToCallableAdapter implements CallableEventExecutorAdapter<Void> {
234 
235         final RunnableEventExecutorAdapter runnable;
236 
237         RunnableToCallableAdapter(RunnableEventExecutorAdapter runnable) {
238             this.runnable = runnable;
239         }
240 
241         @Override
242         public EventExecutor executor() {
243             return runnable.executor();
244         }
245 
246         @Override
247         public Callable<Void> unwrap() {
248             return null;
249         }
250 
251         @Override
252         public Void call() throws Exception {
253             runnable.run();
254             return null;
255         }
256     }
257 }