1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty5.util.concurrent;
17
18 import io.netty5.util.internal.DefaultPriorityQueue;
19 import io.netty5.util.internal.PriorityQueue;
20 import io.netty5.util.internal.PriorityQueueNode;
21
22 import java.util.Comparator;
23 import java.util.Queue;
24 import java.util.concurrent.Callable;
25 import java.util.concurrent.TimeUnit;
26
27 import static java.util.Objects.requireNonNull;
28 import static java.util.concurrent.Executors.callable;
29
30
31
32
33 public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor {
34 static final long START_TIME = System.nanoTime();
35
36 private static final Comparator<RunnableScheduledFutureNode<?>> SCHEDULED_FUTURE_TASK_COMPARATOR =
37 Comparable::compareTo;
38 private static final RunnableScheduledFutureNode<?>[]
39 EMPTY_RUNNABLE_SCHEDULED_FUTURE_NODES = new RunnableScheduledFutureNode<?>[0];
40
41 private PriorityQueue<RunnableScheduledFutureNode<?>> scheduledTaskQueue;
42
43 protected AbstractScheduledEventExecutor() {
44 }
45
46
47
48
49
50 public static long nanoTime() {
51 return defaultCurrentTimeNanos();
52 }
53
54
55
56
57
58 protected static long initialNanoTime() {
59 return START_TIME;
60 }
61
62
63
64
65
66
67
68
69
70
71
72 protected long getCurrentTimeNanos() {
73 return defaultCurrentTimeNanos();
74 }
75
76 static long defaultCurrentTimeNanos() {
77 return System.nanoTime() - START_TIME;
78 }
79
80 static long deadlineNanos(long nanoTime, long delay) {
81 long deadlineNanos = nanoTime + delay;
82
83 return deadlineNanos < 0 ? Long.MAX_VALUE : deadlineNanos;
84 }
85
86 PriorityQueue<RunnableScheduledFutureNode<?>> scheduledTaskQueue() {
87 if (scheduledTaskQueue == null) {
88 scheduledTaskQueue = new DefaultPriorityQueue<>(
89 SCHEDULED_FUTURE_TASK_COMPARATOR,
90
91 11);
92 }
93 return scheduledTaskQueue;
94 }
95
96 private static boolean isNullOrEmpty(Queue<RunnableScheduledFutureNode<?>> queue) {
97 return queue == null || queue.isEmpty();
98 }
99
100
101
102
103
104
105 protected final void cancelScheduledTasks() {
106 assert inEventLoop();
107 PriorityQueue<RunnableScheduledFutureNode<?>> scheduledTaskQueue = this.scheduledTaskQueue;
108 if (isNullOrEmpty(scheduledTaskQueue)) {
109 return;
110 }
111
112 final RunnableScheduledFutureNode<?>[] scheduledTasks =
113 scheduledTaskQueue.toArray(EMPTY_RUNNABLE_SCHEDULED_FUTURE_NODES);
114
115 for (RunnableScheduledFutureNode<?> task : scheduledTasks) {
116 task.cancel();
117 }
118
119 scheduledTaskQueue.clearIgnoringIndexes();
120 }
121
122
123
124
125 protected final RunnableScheduledFuture<?> pollScheduledTask() {
126 return pollScheduledTask(getCurrentTimeNanos());
127 }
128
129
130
131
132
133
134
135 protected final RunnableScheduledFuture<?> pollScheduledTask(long nanoTime) {
136 assert inEventLoop();
137
138 Queue<RunnableScheduledFutureNode<?>> scheduledTaskQueue = this.scheduledTaskQueue;
139 RunnableScheduledFutureNode<?> scheduledTask = scheduledTaskQueue == null? null : scheduledTaskQueue.peek();
140 if (scheduledTask == null) {
141 return null;
142 }
143
144 if (scheduledTask.deadlineNanos() <= nanoTime) {
145 scheduledTaskQueue.remove();
146 return scheduledTask;
147 }
148 return null;
149 }
150
151
152
153
154
155
156 protected final long nextScheduledTaskNano() {
157 Queue<RunnableScheduledFutureNode<?>> scheduledTaskQueue = this.scheduledTaskQueue;
158 RunnableScheduledFutureNode<?> scheduledTask = scheduledTaskQueue == null? null : scheduledTaskQueue.peek();
159 if (scheduledTask == null) {
160 return -1;
161 }
162 return Math.max(0, scheduledTask.deadlineNanos() - getCurrentTimeNanos());
163 }
164
165 final RunnableScheduledFuture<?> peekScheduledTask() {
166 Queue<RunnableScheduledFutureNode<?>> scheduledTaskQueue = this.scheduledTaskQueue;
167 if (scheduledTaskQueue == null) {
168 return null;
169 }
170 return scheduledTaskQueue.peek();
171 }
172
173
174
175
176
177
178 protected final boolean hasScheduledTasks() {
179 assert inEventLoop();
180 Queue<RunnableScheduledFutureNode<?>> scheduledTaskQueue = this.scheduledTaskQueue;
181 RunnableScheduledFutureNode<?> scheduledTask = scheduledTaskQueue == null? null : scheduledTaskQueue.peek();
182 return scheduledTask != null && scheduledTask.deadlineNanos() <= getCurrentTimeNanos();
183 }
184
185 @Override
186 public Future<Void> schedule(Runnable command, long delay, TimeUnit unit) {
187 requireNonNull(command, "command");
188 requireNonNull(unit, "unit");
189 if (delay < 0) {
190 delay = 0;
191 }
192 RunnableScheduledFuture<Void> task = newScheduledTaskFor(
193 callable(command, null), deadlineNanos(getCurrentTimeNanos(), unit.toNanos(delay)), 0);
194 return schedule(task);
195 }
196
197 @Override
198 public <V> Future<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
199 requireNonNull(callable, "callable");
200 requireNonNull(unit, "unit");
201 if (delay < 0) {
202 delay = 0;
203 }
204 RunnableScheduledFuture<V> task = newScheduledTaskFor(
205 callable, deadlineNanos(getCurrentTimeNanos(), unit.toNanos(delay)), 0);
206 return schedule(task);
207 }
208
209 @Override
210 public Future<Void> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
211 requireNonNull(command, "command");
212 requireNonNull(unit, "unit");
213 if (initialDelay < 0) {
214 throw new IllegalArgumentException(
215 String.format("initialDelay: %d (expected: >= 0)", initialDelay));
216 }
217 if (period <= 0) {
218 throw new IllegalArgumentException(
219 String.format("period: %d (expected: > 0)", period));
220 }
221
222 RunnableScheduledFuture<Void> task = newScheduledTaskFor(
223 callable(command, null),
224 deadlineNanos(getCurrentTimeNanos(), unit.toNanos(initialDelay)), unit.toNanos(period));
225 return schedule(task);
226 }
227
228 @Override
229 public Future<Void> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
230 requireNonNull(command, "command");
231 requireNonNull(unit, "unit");
232 if (initialDelay < 0) {
233 throw new IllegalArgumentException(
234 String.format("initialDelay: %d (expected: >= 0)", initialDelay));
235 }
236 if (delay <= 0) {
237 throw new IllegalArgumentException(
238 String.format("delay: %d (expected: > 0)", delay));
239 }
240
241 RunnableScheduledFuture<Void> task = newScheduledTaskFor(
242 callable(command, null),
243 deadlineNanos(getCurrentTimeNanos(), unit.toNanos(initialDelay)), -unit.toNanos(delay));
244 return schedule(task);
245 }
246
247
248
249
250 protected final <V> Future<V> schedule(final RunnableScheduledFuture<V> task) {
251 if (inEventLoop()) {
252 add0(task);
253 } else {
254 execute(() -> add0(task));
255 }
256 return task;
257 }
258
259 private <V> void add0(RunnableScheduledFuture<V> task) {
260 final RunnableScheduledFutureNode<V> node;
261 if (task instanceof RunnableScheduledFutureNode) {
262 node = (RunnableScheduledFutureNode<V>) task;
263 } else {
264 node = new DefaultRunnableScheduledFutureNode<>(task);
265 }
266 scheduledTaskQueue().add(node);
267 }
268
269 final void removeScheduled(final RunnableScheduledFutureNode<?> task) {
270 if (inEventLoop()) {
271 scheduledTaskQueue().removeTyped(task);
272 } else {
273 execute(() -> removeScheduled(task));
274 }
275 }
276
277
278
279
280
281
282
283 protected static <V> RunnableScheduledFuture<V> newRunnableScheduledFuture(
284 AbstractScheduledEventExecutor executor, Promise<V> promise, Callable<V> task,
285 long deadlineNanos, long periodNanos) {
286 return new RunnableScheduledFutureAdapter<>(executor, promise, task, deadlineNanos, periodNanos);
287 }
288
289
290
291
292 protected <V> RunnableScheduledFuture<V> newScheduledTaskFor(
293 Callable<V> callable, long deadlineNanos, long period) {
294 return newRunnableScheduledFuture(this, newPromise(), callable, deadlineNanos, period);
295 }
296
297 interface RunnableScheduledFutureNode<V> extends PriorityQueueNode, RunnableScheduledFuture<V> {
298 }
299
300 private static final class DefaultRunnableScheduledFutureNode<V> implements RunnableScheduledFutureNode<V> {
301 private final RunnableScheduledFuture<V> future;
302 private int queueIndex = INDEX_NOT_IN_QUEUE;
303
304 DefaultRunnableScheduledFutureNode(RunnableScheduledFuture<V> future) {
305 this.future = future;
306 }
307
308 @Override
309 public EventExecutor executor() {
310 return future.executor();
311 }
312
313 @Override
314 public long deadlineNanos() {
315 return future.deadlineNanos();
316 }
317
318 @Override
319 public long delayNanos() {
320 return future.delayNanos();
321 }
322
323 @Override
324 public long delayNanos(long currentTimeNanos) {
325 return future.delayNanos(currentTimeNanos);
326 }
327
328 @Override
329 public RunnableScheduledFuture<V> addListener(FutureListener<? super V> listener) {
330 future.addListener(listener);
331 return this;
332 }
333
334 @Override
335 public <C> RunnableScheduledFuture<V> addListener(
336 C context, FutureContextListener<? super C, ? super V> listener) {
337 future.addListener(context, listener);
338 return this;
339 }
340
341 @Override
342 public boolean isPeriodic() {
343 return future.isPeriodic();
344 }
345
346 @Override
347 public int priorityQueueIndex(DefaultPriorityQueue<?> queue) {
348 return queueIndex;
349 }
350
351 @Override
352 public void priorityQueueIndex(DefaultPriorityQueue<?> queue, int i) {
353 queueIndex = i;
354 }
355
356 @Override
357 public void run() {
358 future.run();
359 }
360
361 @Override
362 public boolean cancel() {
363 return future.cancel();
364 }
365
366 @Override
367 public boolean isCancelled() {
368 return future.isCancelled();
369 }
370
371 @Override
372 public boolean isDone() {
373 return future.isDone();
374 }
375
376 @Override
377 public FutureCompletionStage<V> asStage() {
378 return future.asStage();
379 }
380
381 @Override
382 public int compareTo(RunnableScheduledFuture<?> o) {
383 return future.compareTo(o);
384 }
385
386 @Override
387 public boolean isSuccess() {
388 return future.isSuccess();
389 }
390
391 @Override
392 public boolean isFailed() {
393 return future.isFailed();
394 }
395
396 @Override
397 public boolean isCancellable() {
398 return future.isCancellable();
399 }
400
401 @Override
402 public Throwable cause() {
403 return future.cause();
404 }
405
406 @Override
407 public V getNow() {
408 return future.getNow();
409 }
410 }
411 }