1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.util.concurrent;
17
18 import io.netty.util.internal.DefaultPriorityQueue;
19 import io.netty.util.internal.ObjectUtil;
20 import io.netty.util.internal.PriorityQueue;
21
22 import java.util.Comparator;
23 import java.util.Queue;
24 import java.util.concurrent.Callable;
25 import java.util.concurrent.TimeUnit;
26
27
28
29
30 public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor {
31 private static final Comparator<ScheduledFutureTask<?>> SCHEDULED_FUTURE_TASK_COMPARATOR =
32 new Comparator<ScheduledFutureTask<?>>() {
33 @Override
34 public int compare(ScheduledFutureTask<?> o1, ScheduledFutureTask<?> o2) {
35 return o1.compareTo(o2);
36 }
37 };
38
39 private static final long START_TIME = System.nanoTime();
40
41 static final Runnable WAKEUP_TASK = new Runnable() {
42 @Override
43 public void run() { }
44 };
45
46 PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue;
47
48 long nextTaskId;
49
50 protected AbstractScheduledEventExecutor() {
51 }
52
53 protected AbstractScheduledEventExecutor(EventExecutorGroup parent) {
54 super(parent);
55 }
56
57
58
59
60
61
62
63
64
65
66
67 protected long getCurrentTimeNanos() {
68 return defaultCurrentTimeNanos();
69 }
70
71
72
73
74 @Deprecated
75 protected static long nanoTime() {
76 return defaultCurrentTimeNanos();
77 }
78
79 static long defaultCurrentTimeNanos() {
80 return System.nanoTime() - START_TIME;
81 }
82
83 static long deadlineNanos(long nanoTime, long delay) {
84 long deadlineNanos = nanoTime + delay;
85
86 return deadlineNanos < 0 ? Long.MAX_VALUE : deadlineNanos;
87 }
88
89
90
91
92
93
94
95 protected static long deadlineToDelayNanos(long deadlineNanos) {
96 return ScheduledFutureTask.deadlineToDelayNanos(defaultCurrentTimeNanos(), deadlineNanos);
97 }
98
99
100
101
102
103 protected static long initialNanoTime() {
104 return START_TIME;
105 }
106
107 PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue() {
108 if (scheduledTaskQueue == null) {
109 scheduledTaskQueue = new DefaultPriorityQueue<ScheduledFutureTask<?>>(
110 SCHEDULED_FUTURE_TASK_COMPARATOR,
111
112 11);
113 }
114 return scheduledTaskQueue;
115 }
116
117 private static boolean isNullOrEmpty(Queue<ScheduledFutureTask<?>> queue) {
118 return queue == null || queue.isEmpty();
119 }
120
121
122
123
124
125
126 protected void cancelScheduledTasks() {
127 assert inEventLoop();
128 PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
129 if (isNullOrEmpty(scheduledTaskQueue)) {
130 return;
131 }
132
133 final ScheduledFutureTask<?>[] scheduledTasks =
134 scheduledTaskQueue.toArray(new ScheduledFutureTask<?>[0]);
135
136 for (ScheduledFutureTask<?> task: scheduledTasks) {
137 task.cancelWithoutRemove(false);
138 }
139
140 scheduledTaskQueue.clearIgnoringIndexes();
141 }
142
143
144
145
146 protected final Runnable pollScheduledTask() {
147 return pollScheduledTask(getCurrentTimeNanos());
148 }
149
150
151
152
153
154 protected final Runnable pollScheduledTask(long nanoTime) {
155 assert inEventLoop();
156
157 ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
158 if (scheduledTask == null || scheduledTask.deadlineNanos() - nanoTime > 0) {
159 return null;
160 }
161 scheduledTaskQueue.remove();
162 scheduledTask.setConsumed();
163 return scheduledTask;
164 }
165
166
167
168
169 protected final long nextScheduledTaskNano() {
170 ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
171 return scheduledTask != null ? scheduledTask.delayNanos() : -1;
172 }
173
174
175
176
177
178 protected final long nextScheduledTaskDeadlineNanos() {
179 ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
180 return scheduledTask != null ? scheduledTask.deadlineNanos() : -1;
181 }
182
183 final ScheduledFutureTask<?> peekScheduledTask() {
184 Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
185 return scheduledTaskQueue != null ? scheduledTaskQueue.peek() : null;
186 }
187
188
189
190
191 protected final boolean hasScheduledTasks() {
192 ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
193 return scheduledTask != null && scheduledTask.deadlineNanos() <= getCurrentTimeNanos();
194 }
195
196 @Override
197 public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
198 ObjectUtil.checkNotNull(command, "command");
199 ObjectUtil.checkNotNull(unit, "unit");
200 if (delay < 0) {
201 delay = 0;
202 }
203 validateScheduled0(delay, unit);
204
205 return schedule(new ScheduledFutureTask<Void>(
206 this,
207 command,
208 deadlineNanos(getCurrentTimeNanos(), unit.toNanos(delay))));
209 }
210
211 @Override
212 public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
213 ObjectUtil.checkNotNull(callable, "callable");
214 ObjectUtil.checkNotNull(unit, "unit");
215 if (delay < 0) {
216 delay = 0;
217 }
218 validateScheduled0(delay, unit);
219
220 return schedule(new ScheduledFutureTask<V>(
221 this, callable, deadlineNanos(getCurrentTimeNanos(), unit.toNanos(delay))));
222 }
223
224 @Override
225 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
226 ObjectUtil.checkNotNull(command, "command");
227 ObjectUtil.checkNotNull(unit, "unit");
228 if (initialDelay < 0) {
229 throw new IllegalArgumentException(
230 String.format("initialDelay: %d (expected: >= 0)", initialDelay));
231 }
232 if (period <= 0) {
233 throw new IllegalArgumentException(
234 String.format("period: %d (expected: > 0)", period));
235 }
236 validateScheduled0(initialDelay, unit);
237 validateScheduled0(period, unit);
238
239 return schedule(new ScheduledFutureTask<Void>(
240 this, command, deadlineNanos(getCurrentTimeNanos(), unit.toNanos(initialDelay)), unit.toNanos(period)));
241 }
242
243 @Override
244 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
245 ObjectUtil.checkNotNull(command, "command");
246 ObjectUtil.checkNotNull(unit, "unit");
247 if (initialDelay < 0) {
248 throw new IllegalArgumentException(
249 String.format("initialDelay: %d (expected: >= 0)", initialDelay));
250 }
251 if (delay <= 0) {
252 throw new IllegalArgumentException(
253 String.format("delay: %d (expected: > 0)", delay));
254 }
255
256 validateScheduled0(initialDelay, unit);
257 validateScheduled0(delay, unit);
258
259 return schedule(new ScheduledFutureTask<Void>(
260 this, command, deadlineNanos(getCurrentTimeNanos(), unit.toNanos(initialDelay)), -unit.toNanos(delay)));
261 }
262
263 @SuppressWarnings("deprecation")
264 private void validateScheduled0(long amount, TimeUnit unit) {
265 validateScheduled(amount, unit);
266 }
267
268
269
270
271
272
273 @Deprecated
274 protected void validateScheduled(long amount, TimeUnit unit) {
275
276 }
277
278 final void scheduleFromEventLoop(final ScheduledFutureTask<?> task) {
279
280 scheduledTaskQueue().add(task.setId(++nextTaskId));
281 }
282
283 private <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
284 if (inEventLoop()) {
285 scheduleFromEventLoop(task);
286 } else {
287 final long deadlineNanos = task.deadlineNanos();
288
289 if (beforeScheduledTaskSubmitted(deadlineNanos)) {
290 execute(task);
291 } else {
292 lazyExecute(task);
293
294 if (afterScheduledTaskSubmitted(deadlineNanos)) {
295 execute(WAKEUP_TASK);
296 }
297 }
298 }
299
300 return task;
301 }
302
303 final void removeScheduled(final ScheduledFutureTask<?> task) {
304 assert task.isCancelled();
305 if (inEventLoop()) {
306 scheduledTaskQueue().removeTyped(task);
307 } else {
308
309 scheduleRemoveScheduled(task);
310 }
311 }
312
313 void scheduleRemoveScheduled(final ScheduledFutureTask<?> task) {
314
315 lazyExecute(task);
316 }
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331 protected boolean beforeScheduledTaskSubmitted(long deadlineNanos) {
332 return true;
333 }
334
335
336
337
338
339
340
341 protected boolean afterScheduledTaskSubmitted(long deadlineNanos) {
342 return true;
343 }
344 }