1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.util.concurrent;
17
18 import io.netty.util.internal.DefaultPriorityQueue;
19 import io.netty.util.internal.ObjectUtil;
20 import io.netty.util.internal.PriorityQueue;
21
22 import java.util.Comparator;
23 import java.util.Objects;
24 import java.util.Queue;
25 import java.util.concurrent.Callable;
26 import java.util.concurrent.TimeUnit;
27
28
29
30
31 public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor {
32 private static final Comparator<ScheduledFutureTask<?>> SCHEDULED_FUTURE_TASK_COMPARATOR =
33 new Comparator<ScheduledFutureTask<?>>() {
34 @Override
35 public int compare(ScheduledFutureTask<?> o1, ScheduledFutureTask<?> o2) {
36 return o1.compareTo(o2);
37 }
38 };
39
40 static final Runnable WAKEUP_TASK = new Runnable() {
41 @Override
42 public void run() { }
43 };
44
45 PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue;
46
47 long nextTaskId;
48
49 protected AbstractScheduledEventExecutor() {
50 }
51
52 protected AbstractScheduledEventExecutor(EventExecutorGroup parent) {
53 super(parent);
54 }
55
56 @Override
57 public Ticker ticker() {
58 return Ticker.systemTicker();
59 }
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74 @Deprecated
75 protected long getCurrentTimeNanos() {
76 return ticker().nanoTime();
77 }
78
79
80
81
82 @Deprecated
83 protected static long nanoTime() {
84 return Ticker.systemTicker().nanoTime();
85 }
86
87
88
89
90 @Deprecated
91 static long defaultCurrentTimeNanos() {
92 return Ticker.systemTicker().nanoTime();
93 }
94
95 static long deadlineNanos(long nanoTime, long delay) {
96 long deadlineNanos = nanoTime + delay;
97
98 return deadlineNanos < 0 ? Long.MAX_VALUE : deadlineNanos;
99 }
100
101
102
103
104
105
106
107
108 @Deprecated
109 protected static long deadlineToDelayNanos(long deadlineNanos) {
110 return ScheduledFutureTask.deadlineToDelayNanos(defaultCurrentTimeNanos(), deadlineNanos);
111 }
112
113
114
115
116 protected long delayNanos(long currentTimeNanos, long scheduledPurgeInterval) {
117 currentTimeNanos -= ticker().initialNanoTime();
118
119 ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
120 if (scheduledTask == null) {
121 return scheduledPurgeInterval;
122 }
123
124 return scheduledTask.delayNanos(currentTimeNanos);
125 }
126
127
128
129
130
131
132 @Deprecated
133 protected static long initialNanoTime() {
134 return Ticker.systemTicker().initialNanoTime();
135 }
136
137 PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue() {
138 if (scheduledTaskQueue == null) {
139 scheduledTaskQueue = new DefaultPriorityQueue<ScheduledFutureTask<?>>(
140 SCHEDULED_FUTURE_TASK_COMPARATOR,
141
142 11);
143 }
144 return scheduledTaskQueue;
145 }
146
147 private static boolean isNullOrEmpty(Queue<ScheduledFutureTask<?>> queue) {
148 return queue == null || queue.isEmpty();
149 }
150
151
152
153
154
155
156 protected void cancelScheduledTasks() {
157 assert inEventLoop();
158 PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
159 if (isNullOrEmpty(scheduledTaskQueue)) {
160 return;
161 }
162
163 final ScheduledFutureTask<?>[] scheduledTasks =
164 scheduledTaskQueue.toArray(new ScheduledFutureTask<?>[0]);
165
166 for (ScheduledFutureTask<?> task: scheduledTasks) {
167 task.cancelWithoutRemove(false);
168 }
169
170 scheduledTaskQueue.clearIgnoringIndexes();
171 }
172
173
174
175
176 protected final Runnable pollScheduledTask() {
177 return pollScheduledTask(getCurrentTimeNanos());
178 }
179
180
181
182
183
184
185
186
187 protected boolean fetchFromScheduledTaskQueue(Queue<Runnable> taskQueue) {
188 assert inEventLoop();
189 Objects.requireNonNull(taskQueue, "taskQueue");
190 if (scheduledTaskQueue == null || scheduledTaskQueue.isEmpty()) {
191 return true;
192 }
193 long nanoTime = getCurrentTimeNanos();
194 for (;;) {
195 Runnable scheduledTask = pollScheduledTask(nanoTime);
196 if (scheduledTask == null) {
197 return true;
198 }
199 if (!taskQueue.offer(scheduledTask)) {
200
201 scheduledTaskQueue.add((ScheduledFutureTask<?>) scheduledTask);
202 return false;
203 }
204 }
205 }
206
207
208
209
210
211 protected final Runnable pollScheduledTask(long nanoTime) {
212 assert inEventLoop();
213
214 ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
215 if (scheduledTask == null || scheduledTask.deadlineNanos() - nanoTime > 0) {
216 return null;
217 }
218 scheduledTaskQueue.remove();
219 scheduledTask.setConsumed();
220 return scheduledTask;
221 }
222
223
224
225
226 protected final long nextScheduledTaskNano() {
227 ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
228 return scheduledTask != null ? scheduledTask.delayNanos() : -1;
229 }
230
231
232
233
234
235 protected final long nextScheduledTaskDeadlineNanos() {
236 ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
237 return scheduledTask != null ? scheduledTask.deadlineNanos() : -1;
238 }
239
240 final ScheduledFutureTask<?> peekScheduledTask() {
241 Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
242 return scheduledTaskQueue != null ? scheduledTaskQueue.peek() : null;
243 }
244
245
246
247
248 protected final boolean hasScheduledTasks() {
249 ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
250 return scheduledTask != null && scheduledTask.deadlineNanos() <= getCurrentTimeNanos();
251 }
252
253 @Override
254 public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
255 ObjectUtil.checkNotNull(command, "command");
256 ObjectUtil.checkNotNull(unit, "unit");
257 if (delay < 0) {
258 delay = 0;
259 }
260 validateScheduled0(delay, unit);
261
262 return schedule(new ScheduledFutureTask<Void>(
263 this,
264 command,
265 deadlineNanos(getCurrentTimeNanos(), unit.toNanos(delay))));
266 }
267
268 @Override
269 public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
270 ObjectUtil.checkNotNull(callable, "callable");
271 ObjectUtil.checkNotNull(unit, "unit");
272 if (delay < 0) {
273 delay = 0;
274 }
275 validateScheduled0(delay, unit);
276
277 return schedule(new ScheduledFutureTask<V>(
278 this, callable, deadlineNanos(getCurrentTimeNanos(), unit.toNanos(delay))));
279 }
280
281 @Override
282 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
283 ObjectUtil.checkNotNull(command, "command");
284 ObjectUtil.checkNotNull(unit, "unit");
285 if (initialDelay < 0) {
286 throw new IllegalArgumentException(
287 String.format("initialDelay: %d (expected: >= 0)", initialDelay));
288 }
289 if (period <= 0) {
290 throw new IllegalArgumentException(
291 String.format("period: %d (expected: > 0)", period));
292 }
293 validateScheduled0(initialDelay, unit);
294 validateScheduled0(period, unit);
295
296 return schedule(new ScheduledFutureTask<Void>(
297 this, command, deadlineNanos(getCurrentTimeNanos(), unit.toNanos(initialDelay)), unit.toNanos(period)));
298 }
299
300 @Override
301 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
302 ObjectUtil.checkNotNull(command, "command");
303 ObjectUtil.checkNotNull(unit, "unit");
304 if (initialDelay < 0) {
305 throw new IllegalArgumentException(
306 String.format("initialDelay: %d (expected: >= 0)", initialDelay));
307 }
308 if (delay <= 0) {
309 throw new IllegalArgumentException(
310 String.format("delay: %d (expected: > 0)", delay));
311 }
312
313 validateScheduled0(initialDelay, unit);
314 validateScheduled0(delay, unit);
315
316 return schedule(new ScheduledFutureTask<Void>(
317 this, command, deadlineNanos(getCurrentTimeNanos(), unit.toNanos(initialDelay)), -unit.toNanos(delay)));
318 }
319
320 @SuppressWarnings("deprecation")
321 private void validateScheduled0(long amount, TimeUnit unit) {
322 validateScheduled(amount, unit);
323 }
324
325
326
327
328
329
330 @Deprecated
331 protected void validateScheduled(long amount, TimeUnit unit) {
332
333 }
334
335 final void scheduleFromEventLoop(final ScheduledFutureTask<?> task) {
336
337 scheduledTaskQueue().add(task.setId(++nextTaskId));
338 }
339
340 private <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
341 if (inEventLoop()) {
342 scheduleFromEventLoop(task);
343 } else {
344 final long deadlineNanos = task.deadlineNanos();
345
346 if (beforeScheduledTaskSubmitted(deadlineNanos)) {
347 execute(task);
348 } else {
349 lazyExecute(task);
350
351 if (afterScheduledTaskSubmitted(deadlineNanos)) {
352 execute(WAKEUP_TASK);
353 }
354 }
355 }
356
357 return task;
358 }
359
360 final void removeScheduled(final ScheduledFutureTask<?> task) {
361 assert task.isCancelled();
362 if (inEventLoop()) {
363 scheduledTaskQueue().removeTyped(task);
364 } else {
365
366 scheduleRemoveScheduled(task);
367 }
368 }
369
370 void scheduleRemoveScheduled(final ScheduledFutureTask<?> task) {
371
372 lazyExecute(task);
373 }
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388 protected boolean beforeScheduledTaskSubmitted(long deadlineNanos) {
389 return true;
390 }
391
392
393
394
395
396
397
398 protected boolean afterScheduledTaskSubmitted(long deadlineNanos) {
399 return true;
400 }
401 }