1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty5.util.concurrent;
17
18 import io.netty5.util.internal.StringUtil;
19
20 import java.util.Objects;
21 import java.util.concurrent.CancellationException;
22 import java.util.concurrent.CompletableFuture;
23 import java.util.concurrent.CompletionException;
24 import java.util.concurrent.CompletionStage;
25 import java.util.concurrent.Executor;
26 import java.util.concurrent.TimeUnit;
27 import java.util.function.BiConsumer;
28 import java.util.function.BiFunction;
29 import java.util.function.Consumer;
30 import java.util.function.Function;
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46 public interface FutureCompletionStage<V>
47 extends CompletionStage<V>, java.util.concurrent.Future<V>, AsynchronousResult<V> {
48
49
50
51
52
53
54
55
56 FutureCompletionStage<V> sync() throws InterruptedException;
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76 default <T> T join(BiFunction<V, Throwable, T> resultHandler) throws InterruptedException {
77 Objects.requireNonNull(resultHandler, "resultHandler");
78 await();
79 var fut = future();
80 if (fut.isSuccess()) {
81 return resultHandler.apply(fut.getNow(), null);
82 } else {
83 return resultHandler.apply(null, fut.cause());
84 }
85 }
86
87
88
89
90
91
92 FutureCompletionStage<V> await() throws InterruptedException;
93
94
95
96
97
98
99
100 boolean await(long timeout, TimeUnit unit) throws InterruptedException;
101
102
103
104
105
106
107
108 default Throwable getCause() throws InterruptedException {
109 await();
110 return cause();
111 }
112
113
114
115
116 Future<V> future();
117
118 @Override
119 default boolean cancel() {
120 return future().cancel();
121 }
122
123 @Override
124 default boolean isSuccess() {
125 return future().isSuccess();
126 }
127
128 @Override
129 default boolean isFailed() {
130 return future().isFailed();
131 }
132
133 @Override
134 default boolean isCancellable() {
135 return future().isCancellable();
136 }
137
138 @Override
139 default V getNow() {
140 return future().getNow();
141 }
142
143 @Override
144 default Throwable cause() {
145 return future().cause();
146 }
147
148 @Override
149 default EventExecutor executor() {
150 return future().executor();
151 }
152
153
154
155
156 @Override
157 default CompletableFuture<V> toCompletableFuture() {
158 throw new UnsupportedOperationException("Not supported by "
159 + StringUtil.simpleClassName(FutureCompletionStage.class));
160 }
161
162 @Override
163 <U> FutureCompletionStage<U> thenApply(Function<? super V, ? extends U> fn);
164
165 @Override
166 <U> FutureCompletionStage<U> thenApplyAsync(Function<? super V, ? extends U> fn);
167
168 @Override
169 FutureCompletionStage<Void> thenAccept(Consumer<? super V> action);
170
171 @Override
172 FutureCompletionStage<Void> thenAcceptAsync(Consumer<? super V> action);
173
174 @Override
175 FutureCompletionStage<Void> thenRun(Runnable action);
176
177 @Override
178 FutureCompletionStage<Void> thenRunAsync(Runnable action);
179
180 @Override
181 <U, V1> FutureCompletionStage<V1> thenCombine(
182 CompletionStage<? extends U> other, BiFunction<? super V, ? super U, ? extends V1> fn);
183
184 @Override
185 <U, V1> FutureCompletionStage<V1> thenCombineAsync(
186 CompletionStage<? extends U> other, BiFunction<? super V, ? super U, ? extends V1> fn);
187
188 @Override
189 <U> FutureCompletionStage<Void> thenAcceptBoth(
190 CompletionStage<? extends U> other, BiConsumer<? super V, ? super U> action);
191
192 @Override
193 <U> FutureCompletionStage<Void> thenAcceptBothAsync(
194 CompletionStage<? extends U> other, BiConsumer<? super V, ? super U> action);
195
196 @Override
197 FutureCompletionStage<Void> runAfterBoth(CompletionStage<?> other, Runnable action);
198
199 @Override
200 FutureCompletionStage<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action);
201
202 @Override
203 <U> FutureCompletionStage<U> applyToEither(CompletionStage<? extends V> other, Function<? super V, U> fn);
204
205 @Override
206 <U> FutureCompletionStage<U> applyToEitherAsync(CompletionStage<? extends V> other, Function<? super V, U> fn);
207
208 @Override
209 FutureCompletionStage<Void> acceptEither(CompletionStage<? extends V> other, Consumer<? super V> action);
210
211 @Override
212 FutureCompletionStage<Void> acceptEitherAsync(CompletionStage<? extends V> other, Consumer<? super V> action);
213
214 @Override
215 FutureCompletionStage<Void> runAfterEither(CompletionStage<?> other, Runnable action);
216
217 @Override
218 FutureCompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action);
219
220 @Override
221 <U> FutureCompletionStage<U> thenCompose(Function<? super V, ? extends CompletionStage<U>> fn);
222
223 @Override
224 <U> FutureCompletionStage<U> thenComposeAsync(Function<? super V, ? extends CompletionStage<U>> fn);
225
226 @Override
227 FutureCompletionStage<V> whenComplete(BiConsumer<? super V, ? super Throwable> action);
228
229 @Override
230 FutureCompletionStage<V> whenCompleteAsync(BiConsumer<? super V, ? super Throwable> action);
231
232 @Override
233 <U> FutureCompletionStage<U> handle(BiFunction<? super V, Throwable, ? extends U> fn);
234
235 @Override
236 <U> FutureCompletionStage<U> handleAsync(BiFunction<? super V, Throwable, ? extends U> fn);
237
238 @Override
239 <U> FutureCompletionStage<U> thenApplyAsync(Function<? super V, ? extends U> fn, Executor executor);
240
241 @Override
242 FutureCompletionStage<Void> thenAcceptAsync(Consumer<? super V> action, Executor executor);
243
244 @Override
245 FutureCompletionStage<Void> thenRunAsync(Runnable action, Executor executor);
246
247 @Override
248 <U, V1> FutureCompletionStage<V1> thenCombineAsync(
249 CompletionStage<? extends U> other, BiFunction<? super V, ? super U, ? extends V1> fn, Executor executor);
250
251 @Override
252 <U> FutureCompletionStage<Void> thenAcceptBothAsync(
253 CompletionStage<? extends U> other, BiConsumer<? super V, ? super U> action, Executor executor);
254
255 @Override
256 FutureCompletionStage<Void> runAfterBothAsync(
257 CompletionStage<?> other, Runnable action, Executor executor);
258
259 @Override
260 <U> FutureCompletionStage<U> applyToEitherAsync(
261 CompletionStage<? extends V> other, Function<? super V, U> fn, Executor executor);
262
263 @Override
264 FutureCompletionStage<Void> acceptEitherAsync(
265 CompletionStage<? extends V> other, Consumer<? super V> action, Executor executor);
266
267 @Override
268 FutureCompletionStage<Void> runAfterEitherAsync(
269 CompletionStage<?> other, Runnable action, Executor executor);
270
271 @Override
272 <U> FutureCompletionStage<U> thenComposeAsync(
273 Function<? super V, ? extends CompletionStage<U>> fn, Executor executor);
274
275 @Override
276 FutureCompletionStage<V> exceptionally(Function<Throwable, ? extends V> fn);
277
278 @Override
279 FutureCompletionStage<V> whenCompleteAsync(BiConsumer<? super V, ? super Throwable> action, Executor executor);
280
281 @Override
282 <U> FutureCompletionStage<U> handleAsync(BiFunction<? super V, Throwable, ? extends U> fn, Executor executor);
283
284
285
286
287
288 static <U> FutureCompletionStage<U> toFutureCompletionStage(CompletionStage<U> stage, EventExecutor executor) {
289 Objects.requireNonNull(stage, "stage");
290 Objects.requireNonNull(executor, "executor");
291 if (stage instanceof FutureCompletionStage && ((FutureCompletionStage<?>) stage).executor() == executor) {
292 return (FutureCompletionStage<U>) stage;
293 }
294
295
296 if (stage instanceof CompletableFuture) {
297 CompletableFuture<U> future = (CompletableFuture<U>) stage;
298 if (future.isDone() && !future.isCompletedExceptionally()) {
299 return executor.newSucceededFuture(future.getNow(null)).asStage();
300 }
301 }
302
303 Promise<U> promise = executor.newPromise();
304 stage.whenComplete((v, cause) -> {
305 if (cause != null) {
306 promise.setFailure(cause);
307 } else {
308 promise.setSuccess(v);
309 }
310 });
311 return promise.asFuture().asStage();
312 }
313 }