1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.util.concurrent;
17
18 import io.netty.util.internal.DefaultPriorityQueue;
19 import io.netty.util.internal.ObjectUtil;
20 import io.netty.util.internal.PriorityQueue;
21
22 import java.util.Comparator;
23 import java.util.Objects;
24 import java.util.Queue;
25 import java.util.concurrent.Callable;
26 import java.util.concurrent.TimeUnit;
27
28
29
30
31 public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor {
32 private static final Comparator<ScheduledFutureTask<?>> SCHEDULED_FUTURE_TASK_COMPARATOR =
33 new Comparator<ScheduledFutureTask<?>>() {
34 @Override
35 public int compare(ScheduledFutureTask<?> o1, ScheduledFutureTask<?> o2) {
36 return o1.compareTo(o2);
37 }
38 };
39
40 static final Runnable WAKEUP_TASK = new Runnable() {
41 @Override
42 public void run() { }
43 };
44
45 PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue;
46
47 long nextTaskId;
48
49 protected AbstractScheduledEventExecutor() {
50 }
51
52 protected AbstractScheduledEventExecutor(EventExecutorGroup parent) {
53 super(parent);
54 }
55
56 @Override
57 public Ticker ticker() {
58 return Ticker.systemTicker();
59 }
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74 @Deprecated
75 protected long getCurrentTimeNanos() {
76 return ticker().nanoTime();
77 }
78
79
80
81
82 @Deprecated
83 protected static long nanoTime() {
84 return Ticker.systemTicker().nanoTime();
85 }
86
87
88
89
90 @Deprecated
91 static long defaultCurrentTimeNanos() {
92 return Ticker.systemTicker().nanoTime();
93 }
94
95 static long deadlineNanos(long nanoTime, long delay) {
96 long deadlineNanos = nanoTime + delay;
97
98 return deadlineNanos < 0 ? Long.MAX_VALUE : deadlineNanos;
99 }
100
101
102
103
104
105
106
107
108 @Deprecated
109 protected static long deadlineToDelayNanos(long deadlineNanos) {
110 return ScheduledFutureTask.deadlineToDelayNanos(defaultCurrentTimeNanos(), deadlineNanos);
111 }
112
113
114
115
116 protected long delayNanos(long currentTimeNanos, long scheduledPurgeInterval) {
117 currentTimeNanos -= ticker().initialNanoTime();
118
119 ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
120 if (scheduledTask == null) {
121 return scheduledPurgeInterval;
122 }
123
124 return scheduledTask.delayNanos(currentTimeNanos);
125 }
126
127
128
129
130
131
132 @Deprecated
133 protected static long initialNanoTime() {
134 return Ticker.systemTicker().initialNanoTime();
135 }
136
137 PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue() {
138 if (scheduledTaskQueue == null) {
139 scheduledTaskQueue = new DefaultPriorityQueue<ScheduledFutureTask<?>>(
140 SCHEDULED_FUTURE_TASK_COMPARATOR,
141
142 11);
143 }
144 return scheduledTaskQueue;
145 }
146
147 private static boolean isNullOrEmpty(Queue<ScheduledFutureTask<?>> queue) {
148 return queue == null || queue.isEmpty();
149 }
150
151
152
153
154
155
156 protected void cancelScheduledTasks() {
157 assert inEventLoop();
158 PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
159 if (isNullOrEmpty(scheduledTaskQueue)) {
160 return;
161 }
162
163 final ScheduledFutureTask<?>[] scheduledTasks =
164 scheduledTaskQueue.toArray(new ScheduledFutureTask<?>[0]);
165
166 for (ScheduledFutureTask<?> task: scheduledTasks) {
167 task.cancelWithoutRemove(false);
168 }
169
170 scheduledTaskQueue.clearIgnoringIndexes();
171 }
172
173
174
175
176 protected final Runnable pollScheduledTask() {
177 return pollScheduledTask(getCurrentTimeNanos());
178 }
179
180
181
182
183
184
185
186
187 protected boolean fetchFromScheduledTaskQueue(Queue<Runnable> taskQueue) {
188 assert inEventLoop();
189 Objects.requireNonNull(taskQueue, "taskQueue");
190 if (scheduledTaskQueue == null || scheduledTaskQueue.isEmpty()) {
191 return true;
192 }
193 long nanoTime = getCurrentTimeNanos();
194 for (;;) {
195 ScheduledFutureTask scheduledTask = (ScheduledFutureTask) pollScheduledTask(nanoTime);
196 if (scheduledTask == null) {
197 return true;
198 }
199 if (scheduledTask.isCancelled()) {
200 continue;
201 }
202 if (!taskQueue.offer(scheduledTask)) {
203
204 scheduledTaskQueue.add(scheduledTask);
205 return false;
206 }
207 }
208 }
209
210
211
212
213
214 protected final Runnable pollScheduledTask(long nanoTime) {
215 assert inEventLoop();
216
217 ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
218 if (scheduledTask == null || scheduledTask.deadlineNanos() - nanoTime > 0) {
219 return null;
220 }
221 scheduledTaskQueue.remove();
222 scheduledTask.setConsumed();
223 return scheduledTask;
224 }
225
226
227
228
229 protected final long nextScheduledTaskNano() {
230 ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
231 return scheduledTask != null ? scheduledTask.delayNanos() : -1;
232 }
233
234
235
236
237
238 protected final long nextScheduledTaskDeadlineNanos() {
239 ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
240 return scheduledTask != null ? scheduledTask.deadlineNanos() : -1;
241 }
242
243 final ScheduledFutureTask<?> peekScheduledTask() {
244 Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
245 return scheduledTaskQueue != null ? scheduledTaskQueue.peek() : null;
246 }
247
248
249
250
251 protected final boolean hasScheduledTasks() {
252 ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
253 return scheduledTask != null && scheduledTask.deadlineNanos() <= getCurrentTimeNanos();
254 }
255
256 @Override
257 public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
258 ObjectUtil.checkNotNull(command, "command");
259 ObjectUtil.checkNotNull(unit, "unit");
260 if (delay < 0) {
261 delay = 0;
262 }
263 validateScheduled0(delay, unit);
264
265 return schedule(new ScheduledFutureTask<Void>(
266 this,
267 command,
268 deadlineNanos(getCurrentTimeNanos(), unit.toNanos(delay))));
269 }
270
271 @Override
272 public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
273 ObjectUtil.checkNotNull(callable, "callable");
274 ObjectUtil.checkNotNull(unit, "unit");
275 if (delay < 0) {
276 delay = 0;
277 }
278 validateScheduled0(delay, unit);
279
280 return schedule(new ScheduledFutureTask<V>(
281 this, callable, deadlineNanos(getCurrentTimeNanos(), unit.toNanos(delay))));
282 }
283
284 @Override
285 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
286 ObjectUtil.checkNotNull(command, "command");
287 ObjectUtil.checkNotNull(unit, "unit");
288 if (initialDelay < 0) {
289 throw new IllegalArgumentException(
290 String.format("initialDelay: %d (expected: >= 0)", initialDelay));
291 }
292 if (period <= 0) {
293 throw new IllegalArgumentException(
294 String.format("period: %d (expected: > 0)", period));
295 }
296 validateScheduled0(initialDelay, unit);
297 validateScheduled0(period, unit);
298
299 return schedule(new ScheduledFutureTask<Void>(
300 this, command, deadlineNanos(getCurrentTimeNanos(), unit.toNanos(initialDelay)), unit.toNanos(period)));
301 }
302
303 @Override
304 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
305 ObjectUtil.checkNotNull(command, "command");
306 ObjectUtil.checkNotNull(unit, "unit");
307 if (initialDelay < 0) {
308 throw new IllegalArgumentException(
309 String.format("initialDelay: %d (expected: >= 0)", initialDelay));
310 }
311 if (delay <= 0) {
312 throw new IllegalArgumentException(
313 String.format("delay: %d (expected: > 0)", delay));
314 }
315
316 validateScheduled0(initialDelay, unit);
317 validateScheduled0(delay, unit);
318
319 return schedule(new ScheduledFutureTask<Void>(
320 this, command, deadlineNanos(getCurrentTimeNanos(), unit.toNanos(initialDelay)), -unit.toNanos(delay)));
321 }
322
323 @SuppressWarnings("deprecation")
324 private void validateScheduled0(long amount, TimeUnit unit) {
325 validateScheduled(amount, unit);
326 }
327
328
329
330
331
332
333 @Deprecated
334 protected void validateScheduled(long amount, TimeUnit unit) {
335
336 }
337
338 final void scheduleFromEventLoop(final ScheduledFutureTask<?> task) {
339
340 if (task.getId() == 0L) {
341 task.setId(++nextTaskId);
342 }
343 scheduledTaskQueue().add(task);
344 }
345
346 private <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
347 if (inEventLoop()) {
348 scheduleFromEventLoop(task);
349 } else {
350 final long deadlineNanos = task.deadlineNanos();
351
352 if (beforeScheduledTaskSubmitted(deadlineNanos)) {
353 execute(task);
354 } else {
355 lazyExecute(task);
356
357 if (afterScheduledTaskSubmitted(deadlineNanos)) {
358 execute(WAKEUP_TASK);
359 }
360 }
361 }
362
363 return task;
364 }
365
366 final void removeScheduled(final ScheduledFutureTask<?> task) {
367 assert task.isCancelled();
368 if (inEventLoop()) {
369 scheduledTaskQueue().removeTyped(task);
370 } else {
371
372 scheduleRemoveScheduled(task);
373 }
374 }
375
376 void scheduleRemoveScheduled(final ScheduledFutureTask<?> task) {
377
378 lazyExecute(task);
379 }
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394 protected boolean beforeScheduledTaskSubmitted(long deadlineNanos) {
395 return true;
396 }
397
398
399
400
401
402
403
404 protected boolean afterScheduledTaskSubmitted(long deadlineNanos) {
405 return true;
406 }
407 }