1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.util.concurrent;
17
18 import io.netty.util.internal.DefaultPriorityQueue;
19 import io.netty.util.internal.ObjectUtil;
20 import io.netty.util.internal.PriorityQueue;
21
22 import java.util.Comparator;
23 import java.util.Objects;
24 import java.util.Queue;
25 import java.util.concurrent.Callable;
26 import java.util.concurrent.TimeUnit;
27
28
29
30
31 public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor {
32 private static final Comparator<ScheduledFutureTask<?>> SCHEDULED_FUTURE_TASK_COMPARATOR =
33 new Comparator<ScheduledFutureTask<?>>() {
34 @Override
35 public int compare(ScheduledFutureTask<?> o1, ScheduledFutureTask<?> o2) {
36 return o1.compareTo(o2);
37 }
38 };
39
40 private static final long START_TIME = System.nanoTime();
41
42 static final Runnable WAKEUP_TASK = new Runnable() {
43 @Override
44 public void run() { }
45 };
46
47 PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue;
48
49 long nextTaskId;
50
51 protected AbstractScheduledEventExecutor() {
52 }
53
54 protected AbstractScheduledEventExecutor(EventExecutorGroup parent) {
55 super(parent);
56 }
57
58
59
60
61
62
63
64
65
66
67
68 protected long getCurrentTimeNanos() {
69 return defaultCurrentTimeNanos();
70 }
71
72
73
74
75 @Deprecated
76 protected static long nanoTime() {
77 return defaultCurrentTimeNanos();
78 }
79
80 static long defaultCurrentTimeNanos() {
81 return System.nanoTime() - START_TIME;
82 }
83
84 static long deadlineNanos(long nanoTime, long delay) {
85 long deadlineNanos = nanoTime + delay;
86
87 return deadlineNanos < 0 ? Long.MAX_VALUE : deadlineNanos;
88 }
89
90
91
92
93
94
95
96 protected static long deadlineToDelayNanos(long deadlineNanos) {
97 return ScheduledFutureTask.deadlineToDelayNanos(defaultCurrentTimeNanos(), deadlineNanos);
98 }
99
100
101
102
103 protected long delayNanos(long currentTimeNanos, long scheduledPurgeInterval) {
104 currentTimeNanos -= initialNanoTime();
105
106 ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
107 if (scheduledTask == null) {
108 return scheduledPurgeInterval;
109 }
110
111 return scheduledTask.delayNanos(currentTimeNanos);
112 }
113
114
115
116
117
118 protected static long initialNanoTime() {
119 return START_TIME;
120 }
121
122 PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue() {
123 if (scheduledTaskQueue == null) {
124 scheduledTaskQueue = new DefaultPriorityQueue<ScheduledFutureTask<?>>(
125 SCHEDULED_FUTURE_TASK_COMPARATOR,
126
127 11);
128 }
129 return scheduledTaskQueue;
130 }
131
132 private static boolean isNullOrEmpty(Queue<ScheduledFutureTask<?>> queue) {
133 return queue == null || queue.isEmpty();
134 }
135
136
137
138
139
140
141 protected void cancelScheduledTasks() {
142 assert inEventLoop();
143 PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
144 if (isNullOrEmpty(scheduledTaskQueue)) {
145 return;
146 }
147
148 final ScheduledFutureTask<?>[] scheduledTasks =
149 scheduledTaskQueue.toArray(new ScheduledFutureTask<?>[0]);
150
151 for (ScheduledFutureTask<?> task: scheduledTasks) {
152 task.cancelWithoutRemove(false);
153 }
154
155 scheduledTaskQueue.clearIgnoringIndexes();
156 }
157
158
159
160
161 protected final Runnable pollScheduledTask() {
162 return pollScheduledTask(getCurrentTimeNanos());
163 }
164
165
166
167
168
169
170
171
172 protected boolean fetchFromScheduledTaskQueue(Queue<Runnable> taskQueue) {
173 assert inEventLoop();
174 Objects.requireNonNull(taskQueue, "taskQueue");
175 if (scheduledTaskQueue == null || scheduledTaskQueue.isEmpty()) {
176 return true;
177 }
178 long nanoTime = getCurrentTimeNanos();
179 for (;;) {
180 Runnable scheduledTask = pollScheduledTask(nanoTime);
181 if (scheduledTask == null) {
182 return true;
183 }
184 if (!taskQueue.offer(scheduledTask)) {
185
186 scheduledTaskQueue.add((ScheduledFutureTask<?>) scheduledTask);
187 return false;
188 }
189 }
190 }
191
192
193
194
195
196 protected final Runnable pollScheduledTask(long nanoTime) {
197 assert inEventLoop();
198
199 ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
200 if (scheduledTask == null || scheduledTask.deadlineNanos() - nanoTime > 0) {
201 return null;
202 }
203 scheduledTaskQueue.remove();
204 scheduledTask.setConsumed();
205 return scheduledTask;
206 }
207
208
209
210
211 protected final long nextScheduledTaskNano() {
212 ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
213 return scheduledTask != null ? scheduledTask.delayNanos() : -1;
214 }
215
216
217
218
219
220 protected final long nextScheduledTaskDeadlineNanos() {
221 ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
222 return scheduledTask != null ? scheduledTask.deadlineNanos() : -1;
223 }
224
225 final ScheduledFutureTask<?> peekScheduledTask() {
226 Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
227 return scheduledTaskQueue != null ? scheduledTaskQueue.peek() : null;
228 }
229
230
231
232
233 protected final boolean hasScheduledTasks() {
234 ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
235 return scheduledTask != null && scheduledTask.deadlineNanos() <= getCurrentTimeNanos();
236 }
237
238 @Override
239 public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
240 ObjectUtil.checkNotNull(command, "command");
241 ObjectUtil.checkNotNull(unit, "unit");
242 if (delay < 0) {
243 delay = 0;
244 }
245 validateScheduled0(delay, unit);
246
247 return schedule(new ScheduledFutureTask<Void>(
248 this,
249 command,
250 deadlineNanos(getCurrentTimeNanos(), unit.toNanos(delay))));
251 }
252
253 @Override
254 public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
255 ObjectUtil.checkNotNull(callable, "callable");
256 ObjectUtil.checkNotNull(unit, "unit");
257 if (delay < 0) {
258 delay = 0;
259 }
260 validateScheduled0(delay, unit);
261
262 return schedule(new ScheduledFutureTask<V>(
263 this, callable, deadlineNanos(getCurrentTimeNanos(), unit.toNanos(delay))));
264 }
265
266 @Override
267 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
268 ObjectUtil.checkNotNull(command, "command");
269 ObjectUtil.checkNotNull(unit, "unit");
270 if (initialDelay < 0) {
271 throw new IllegalArgumentException(
272 String.format("initialDelay: %d (expected: >= 0)", initialDelay));
273 }
274 if (period <= 0) {
275 throw new IllegalArgumentException(
276 String.format("period: %d (expected: > 0)", period));
277 }
278 validateScheduled0(initialDelay, unit);
279 validateScheduled0(period, unit);
280
281 return schedule(new ScheduledFutureTask<Void>(
282 this, command, deadlineNanos(getCurrentTimeNanos(), unit.toNanos(initialDelay)), unit.toNanos(period)));
283 }
284
285 @Override
286 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
287 ObjectUtil.checkNotNull(command, "command");
288 ObjectUtil.checkNotNull(unit, "unit");
289 if (initialDelay < 0) {
290 throw new IllegalArgumentException(
291 String.format("initialDelay: %d (expected: >= 0)", initialDelay));
292 }
293 if (delay <= 0) {
294 throw new IllegalArgumentException(
295 String.format("delay: %d (expected: > 0)", delay));
296 }
297
298 validateScheduled0(initialDelay, unit);
299 validateScheduled0(delay, unit);
300
301 return schedule(new ScheduledFutureTask<Void>(
302 this, command, deadlineNanos(getCurrentTimeNanos(), unit.toNanos(initialDelay)), -unit.toNanos(delay)));
303 }
304
305 @SuppressWarnings("deprecation")
306 private void validateScheduled0(long amount, TimeUnit unit) {
307 validateScheduled(amount, unit);
308 }
309
310
311
312
313
314
315 @Deprecated
316 protected void validateScheduled(long amount, TimeUnit unit) {
317
318 }
319
320 final void scheduleFromEventLoop(final ScheduledFutureTask<?> task) {
321
322 scheduledTaskQueue().add(task.setId(++nextTaskId));
323 }
324
325 private <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
326 if (inEventLoop()) {
327 scheduleFromEventLoop(task);
328 } else {
329 final long deadlineNanos = task.deadlineNanos();
330
331 if (beforeScheduledTaskSubmitted(deadlineNanos)) {
332 execute(task);
333 } else {
334 lazyExecute(task);
335
336 if (afterScheduledTaskSubmitted(deadlineNanos)) {
337 execute(WAKEUP_TASK);
338 }
339 }
340 }
341
342 return task;
343 }
344
345 final void removeScheduled(final ScheduledFutureTask<?> task) {
346 assert task.isCancelled();
347 if (inEventLoop()) {
348 scheduledTaskQueue().removeTyped(task);
349 } else {
350
351 scheduleRemoveScheduled(task);
352 }
353 }
354
355 void scheduleRemoveScheduled(final ScheduledFutureTask<?> task) {
356
357 lazyExecute(task);
358 }
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373 protected boolean beforeScheduledTaskSubmitted(long deadlineNanos) {
374 return true;
375 }
376
377
378
379
380
381
382
383 protected boolean afterScheduledTaskSubmitted(long deadlineNanos) {
384 return true;
385 }
386 }