1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package io.netty5.util.concurrent;
18
19 import io.netty5.util.internal.DefaultPriorityQueue;
20 import io.netty5.util.internal.StringUtil;
21
22 import java.util.concurrent.Callable;
23 import java.util.concurrent.atomic.AtomicLong;
24
25 import static java.util.Objects.requireNonNull;
26
27 final class RunnableScheduledFutureAdapter<V> implements AbstractScheduledEventExecutor.RunnableScheduledFutureNode<V> {
28 private static final AtomicLong NEXT_TASK_ID = new AtomicLong();
29
30 private final long id = NEXT_TASK_ID.getAndIncrement();
31 private long deadlineNanos;
32
33 private final long periodNanos;
34
35 private int queueIndex = INDEX_NOT_IN_QUEUE;
36
37 private final AbstractScheduledEventExecutor executor;
38 private final Promise<V> promise;
39 private final Future<V> future;
40 private final Callable<V> callable;
41
42 RunnableScheduledFutureAdapter(AbstractScheduledEventExecutor executor, Promise<V> promise, Callable<V> callable,
43 long deadlineNanos, long periodNanos) {
44 this.executor = requireNonNull(executor, "executor");
45 this.promise = requireNonNull(promise, "promise");
46 future = promise.asFuture();
47 this.callable = requireNonNull(callable, "callable");
48 this.deadlineNanos = deadlineNanos;
49 this.periodNanos = periodNanos;
50 }
51
52 @Override
53 public EventExecutor executor() {
54 return executor;
55 }
56
57 @Override
58 public long deadlineNanos() {
59 return deadlineNanos;
60 }
61
62 @Override
63 public long delayNanos() {
64 return delayNanos(executor.getCurrentTimeNanos());
65 }
66
67 @Override
68 public long delayNanos(long currentTimeNanos) {
69 return deadlineToDelayNanos(currentTimeNanos, deadlineNanos);
70 }
71
72 private static long deadlineToDelayNanos(long currentTimeNanos, long deadlineNanos) {
73 return deadlineNanos == 0L ? 0L : Math.max(0L, deadlineNanos - currentTimeNanos);
74 }
75
76 @Override
77 public int compareTo(RunnableScheduledFuture<?> o) {
78 if (this == o) {
79 return 0;
80 }
81
82 RunnableScheduledFutureAdapter<?> that = (RunnableScheduledFutureAdapter<?>) o;
83 long d = deadlineNanos() - that.deadlineNanos();
84 if (d < 0) {
85 return -1;
86 } else if (d > 0) {
87 return 1;
88 } else if (id < that.id) {
89 return -1;
90 } else if (id == that.id) {
91 throw new Error();
92 } else {
93 return 1;
94 }
95 }
96
97 @Override
98 public int hashCode() {
99 return Long.hashCode(id);
100 }
101
102 @Override
103 public boolean equals(Object obj) {
104 if (this == obj) {
105 return true;
106 }
107 if (obj instanceof RunnableScheduledFutureAdapter) {
108 RunnableScheduledFutureAdapter<?> adaptor = (RunnableScheduledFutureAdapter<?>) obj;
109 return id == adaptor.id;
110 }
111 return false;
112 }
113
114 @Override
115 public void run() {
116 try {
117 if (!isPeriodic()) {
118 if (promise.setUncancellable()) {
119 V result = callable.call();
120 promise.setSuccess(result);
121 }
122 } else {
123
124 if (!isCancelled()) {
125 callable.call();
126 if (!executor.isShutdown()) {
127 long p = periodNanos;
128 if (p > 0) {
129 deadlineNanos += p;
130 } else {
131 deadlineNanos = executor.getCurrentTimeNanos() - p;
132 }
133 if (!isCancelled()) {
134 executor.schedule(this);
135 }
136 }
137 }
138 }
139 } catch (Throwable cause) {
140 promise.setFailure(cause);
141 }
142 }
143
144 @Override
145 public boolean cancel() {
146 boolean canceled = future.cancel();
147 if (canceled) {
148 executor.removeScheduled(this);
149 }
150 return canceled;
151 }
152
153 @Override
154 public boolean isSuccess() {
155 return promise.isSuccess();
156 }
157
158 @Override
159 public boolean isFailed() {
160 return promise.isFailed();
161 }
162
163 @Override
164 public boolean isCancellable() {
165 return promise.isCancellable();
166 }
167
168 @Override
169 public Throwable cause() {
170 return promise.cause();
171 }
172
173 @Override
174 public RunnableScheduledFuture<V> addListener(FutureListener<? super V> listener) {
175 future.addListener(listener);
176 return this;
177 }
178
179 @Override
180 public <C> RunnableScheduledFuture<V> addListener(C context, FutureContextListener<? super C, ? super V> listener) {
181 future.addListener(context, listener);
182 return this;
183 }
184
185 @Override
186 public V getNow() {
187 return promise.getNow();
188 }
189
190 @Override
191 public boolean isPeriodic() {
192 return periodNanos != 0;
193 }
194
195 @Override
196 public boolean isCancelled() {
197 return future.isCancelled();
198 }
199
200 @Override
201 public boolean isDone() {
202 return promise.isDone();
203 }
204
205 @Override
206 public FutureCompletionStage<V> asStage() {
207 return future.asStage();
208 }
209
210 @Override
211 public int priorityQueueIndex(DefaultPriorityQueue<?> queue) {
212 return queueIndex;
213 }
214
215 @Override
216 public void priorityQueueIndex(DefaultPriorityQueue<?> queue, int i) {
217 queueIndex = i;
218 }
219
220 @Override
221 public String toString() {
222 StringBuilder buf = new StringBuilder(64)
223 .append(StringUtil.simpleClassName(this))
224 .append('@')
225 .append(Integer.toHexString(hashCode()));
226
227 if (!isDone()) {
228 buf.append("(incomplete)");
229 } else {
230 Throwable cause = cause();
231 if (cause != null) {
232 buf.append("(failure: ")
233 .append(cause)
234 .append(')');
235 } else {
236 Object result = getNow();
237 if (result == null) {
238 buf.append("(success)");
239 } else {
240 buf.append("(success: ")
241 .append(result)
242 .append(')');
243 }
244 }
245 }
246 return buf.append(" task: ")
247 .append(callable)
248 .append(", id: ")
249 .append(id)
250 .append(", deadline: ")
251 .append(deadlineNanos)
252 .append(", period: ")
253 .append(periodNanos)
254 .append(')').toString();
255 }
256 }