View Javadoc
1   /*
2    * Copyright 2015 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.util.concurrent;
17  
18  import io.netty.util.internal.DefaultPriorityQueue;
19  import io.netty.util.internal.ObjectUtil;
20  import io.netty.util.internal.PriorityQueue;
21  
22  import java.util.Comparator;
23  import java.util.Queue;
24  import java.util.concurrent.Callable;
25  import java.util.concurrent.Executors;
26  import java.util.concurrent.TimeUnit;
27  
28  /**
29   * Abstract base class for {@link EventExecutor}s that want to support scheduling.
30   */
31  public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor {
32  
33      private static final Comparator<ScheduledFutureTask<?>> SCHEDULED_FUTURE_TASK_COMPARATOR =
34              new Comparator<ScheduledFutureTask<?>>() {
35                  @Override
36                  public int compare(ScheduledFutureTask<?> o1, ScheduledFutureTask<?> o2) {
37                      return o1.compareTo(o2);
38                  }
39              };
40  
41      PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue;
42  
43      protected AbstractScheduledEventExecutor() {
44      }
45  
46      protected AbstractScheduledEventExecutor(EventExecutorGroup parent) {
47          super(parent);
48      }
49  
50      protected static long nanoTime() {
51          return ScheduledFutureTask.nanoTime();
52      }
53  
54      PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue() {
55          if (scheduledTaskQueue == null) {
56              scheduledTaskQueue = new DefaultPriorityQueue<ScheduledFutureTask<?>>(
57                      SCHEDULED_FUTURE_TASK_COMPARATOR,
58                      // Use same initial capacity as java.util.PriorityQueue
59                      11);
60          }
61          return scheduledTaskQueue;
62      }
63  
64      private static boolean isNullOrEmpty(Queue<ScheduledFutureTask<?>> queue) {
65          return queue == null || queue.isEmpty();
66      }
67  
68      /**
69       * Cancel all scheduled tasks.
70       *
71       * This method MUST be called only when {@link #inEventLoop()} is {@code true}.
72       */
73      protected void cancelScheduledTasks() {
74          assert inEventLoop();
75          PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
76          if (isNullOrEmpty(scheduledTaskQueue)) {
77              return;
78          }
79  
80          final ScheduledFutureTask<?>[] scheduledTasks =
81                  scheduledTaskQueue.toArray(new ScheduledFutureTask<?>[scheduledTaskQueue.size()]);
82  
83          for (ScheduledFutureTask<?> task: scheduledTasks) {
84              task.cancelWithoutRemove(false);
85          }
86  
87          scheduledTaskQueue.clearIgnoringIndexes();
88      }
89  
90      /**
91       * @see #pollScheduledTask(long)
92       */
93      protected final Runnable pollScheduledTask() {
94          return pollScheduledTask(nanoTime());
95      }
96  
97      /**
98       * Return the {@link Runnable} which is ready to be executed with the given {@code nanoTime}.
99       * You should use {@link #nanoTime()} to retrieve the the correct {@code nanoTime}.
100      */
101     protected final Runnable pollScheduledTask(long nanoTime) {
102         assert inEventLoop();
103 
104         Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
105         ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();
106         if (scheduledTask == null) {
107             return null;
108         }
109 
110         if (scheduledTask.deadlineNanos() <= nanoTime) {
111             scheduledTaskQueue.remove();
112             return scheduledTask;
113         }
114         return null;
115     }
116 
117     /**
118      * Return the nanoseconds when the next scheduled task is ready to be run or {@code -1} if no task is scheduled.
119      */
120     protected final long nextScheduledTaskNano() {
121         Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
122         ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();
123         if (scheduledTask == null) {
124             return -1;
125         }
126         return Math.max(0, scheduledTask.deadlineNanos() - nanoTime());
127     }
128 
129     final ScheduledFutureTask<?> peekScheduledTask() {
130         Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
131         if (scheduledTaskQueue == null) {
132             return null;
133         }
134         return scheduledTaskQueue.peek();
135     }
136 
137     /**
138      * Returns {@code true} if a scheduled task is ready for processing.
139      */
140     protected final boolean hasScheduledTasks() {
141         Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
142         ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();
143         return scheduledTask != null && scheduledTask.deadlineNanos() <= nanoTime();
144     }
145 
146     @Override
147     public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
148         ObjectUtil.checkNotNull(command, "command");
149         ObjectUtil.checkNotNull(unit, "unit");
150         if (delay < 0) {
151             delay = 0;
152         }
153         return schedule(new ScheduledFutureTask<Void>(
154                 this, command, null, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay))));
155     }
156 
157     @Override
158     public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
159         ObjectUtil.checkNotNull(callable, "callable");
160         ObjectUtil.checkNotNull(unit, "unit");
161         if (delay < 0) {
162             delay = 0;
163         }
164         return schedule(new ScheduledFutureTask<V>(
165                 this, callable, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay))));
166     }
167 
168     @Override
169     public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
170         ObjectUtil.checkNotNull(command, "command");
171         ObjectUtil.checkNotNull(unit, "unit");
172         if (initialDelay < 0) {
173             throw new IllegalArgumentException(
174                     String.format("initialDelay: %d (expected: >= 0)", initialDelay));
175         }
176         if (period <= 0) {
177             throw new IllegalArgumentException(
178                     String.format("period: %d (expected: > 0)", period));
179         }
180 
181         return schedule(new ScheduledFutureTask<Void>(
182                 this, Executors.<Void>callable(command, null),
183                 ScheduledFutureTask.deadlineNanos(unit.toNanos(initialDelay)), unit.toNanos(period)));
184     }
185 
186     @Override
187     public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
188         ObjectUtil.checkNotNull(command, "command");
189         ObjectUtil.checkNotNull(unit, "unit");
190         if (initialDelay < 0) {
191             throw new IllegalArgumentException(
192                     String.format("initialDelay: %d (expected: >= 0)", initialDelay));
193         }
194         if (delay <= 0) {
195             throw new IllegalArgumentException(
196                     String.format("delay: %d (expected: > 0)", delay));
197         }
198 
199         return schedule(new ScheduledFutureTask<Void>(
200                 this, Executors.<Void>callable(command, null),
201                 ScheduledFutureTask.deadlineNanos(unit.toNanos(initialDelay)), -unit.toNanos(delay)));
202     }
203 
204     <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
205         if (inEventLoop()) {
206             scheduledTaskQueue().add(task);
207         } else {
208             execute(new Runnable() {
209                 @Override
210                 public void run() {
211                     scheduledTaskQueue().add(task);
212                 }
213             });
214         }
215 
216         return task;
217     }
218 
219     final void removeScheduled(final ScheduledFutureTask<?> task) {
220         if (inEventLoop()) {
221             scheduledTaskQueue().removeTyped(task);
222         } else {
223             execute(new Runnable() {
224                 @Override
225                 public void run() {
226                     removeScheduled(task);
227                 }
228             });
229         }
230     }
231 }