1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  package io.netty5.util.concurrent;
17  
18  import io.netty5.util.internal.StringUtil;
19  
20  import java.util.Objects;
21  import java.util.concurrent.CancellationException;
22  import java.util.concurrent.CompletableFuture;
23  import java.util.concurrent.CompletionException;
24  import java.util.concurrent.CompletionStage;
25  import java.util.concurrent.Executor;
26  import java.util.concurrent.TimeUnit;
27  import java.util.function.BiConsumer;
28  import java.util.function.BiFunction;
29  import java.util.function.Consumer;
30  import java.util.function.Function;
31  
32  
33  
34  
35  
36  
37  
38  
39  
40  
41  
42  
43  
44  
45  
46  public interface FutureCompletionStage<V>
47          extends CompletionStage<V>, java.util.concurrent.Future<V>, AsynchronousResult<V> {
48  
49      
50  
51  
52  
53  
54  
55  
56      FutureCompletionStage<V> sync() throws InterruptedException;
57  
58      
59  
60  
61  
62  
63  
64  
65  
66  
67  
68  
69  
70  
71  
72  
73  
74  
75  
76      default <T> T join(BiFunction<V, Throwable, T> resultHandler) throws InterruptedException {
77          Objects.requireNonNull(resultHandler, "resultHandler");
78          await();
79          var fut = future();
80          if (fut.isSuccess()) {
81              return resultHandler.apply(fut.getNow(), null);
82          } else {
83              return resultHandler.apply(null, fut.cause());
84          }
85      }
86  
87      
88  
89  
90  
91  
92      FutureCompletionStage<V> await() throws InterruptedException;
93  
94      
95  
96  
97  
98  
99  
100     boolean await(long timeout, TimeUnit unit) throws InterruptedException;
101 
102     
103 
104 
105 
106 
107 
108     default Throwable getCause() throws InterruptedException {
109         await();
110         return cause();
111     }
112 
113     
114 
115 
116     Future<V> future();
117 
118     @Override
119     default boolean cancel() {
120         return future().cancel();
121     }
122 
123     @Override
124     default boolean isSuccess() {
125         return future().isSuccess();
126     }
127 
128     @Override
129     default boolean isFailed() {
130         return future().isFailed();
131     }
132 
133     @Override
134     default boolean isCancellable() {
135         return future().isCancellable();
136     }
137 
138     @Override
139     default V getNow() {
140         return future().getNow();
141     }
142 
143     @Override
144     default Throwable cause() {
145         return future().cause();
146     }
147 
148     @Override
149     default EventExecutor executor() {
150         return future().executor();
151     }
152 
153     
154 
155 
156     @Override
157     default CompletableFuture<V> toCompletableFuture() {
158         throw new UnsupportedOperationException("Not supported by "
159                 + StringUtil.simpleClassName(FutureCompletionStage.class));
160     }
161 
162     @Override
163     <U> FutureCompletionStage<U> thenApply(Function<? super V, ? extends U> fn);
164 
165     @Override
166     <U> FutureCompletionStage<U> thenApplyAsync(Function<? super V, ? extends U> fn);
167 
168     @Override
169     FutureCompletionStage<Void> thenAccept(Consumer<? super V> action);
170 
171     @Override
172     FutureCompletionStage<Void> thenAcceptAsync(Consumer<? super V> action);
173 
174     @Override
175     FutureCompletionStage<Void> thenRun(Runnable action);
176 
177     @Override
178     FutureCompletionStage<Void> thenRunAsync(Runnable action);
179 
180     @Override
181     <U, V1> FutureCompletionStage<V1> thenCombine(
182             CompletionStage<? extends U> other, BiFunction<? super V, ? super U, ? extends V1> fn);
183 
184     @Override
185     <U, V1> FutureCompletionStage<V1> thenCombineAsync(
186             CompletionStage<? extends U> other, BiFunction<? super V, ? super U, ? extends V1> fn);
187 
188     @Override
189     <U> FutureCompletionStage<Void> thenAcceptBoth(
190             CompletionStage<? extends U> other, BiConsumer<? super V, ? super U> action);
191 
192     @Override
193     <U> FutureCompletionStage<Void> thenAcceptBothAsync(
194             CompletionStage<? extends U> other, BiConsumer<? super V, ? super U> action);
195 
196     @Override
197     FutureCompletionStage<Void> runAfterBoth(CompletionStage<?> other, Runnable action);
198 
199     @Override
200     FutureCompletionStage<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action);
201 
202     @Override
203     <U> FutureCompletionStage<U> applyToEither(CompletionStage<? extends V> other, Function<? super V, U> fn);
204 
205     @Override
206     <U> FutureCompletionStage<U> applyToEitherAsync(CompletionStage<? extends V> other, Function<? super V, U> fn);
207 
208     @Override
209     FutureCompletionStage<Void> acceptEither(CompletionStage<? extends V> other, Consumer<? super V> action);
210 
211     @Override
212     FutureCompletionStage<Void> acceptEitherAsync(CompletionStage<? extends V> other, Consumer<? super V> action);
213 
214     @Override
215     FutureCompletionStage<Void> runAfterEither(CompletionStage<?> other, Runnable action);
216 
217     @Override
218     FutureCompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action);
219 
220     @Override
221     <U> FutureCompletionStage<U> thenCompose(Function<? super V, ? extends CompletionStage<U>> fn);
222 
223     @Override
224     <U> FutureCompletionStage<U> thenComposeAsync(Function<? super V, ? extends CompletionStage<U>> fn);
225 
226     @Override
227     FutureCompletionStage<V> whenComplete(BiConsumer<? super V, ? super Throwable> action);
228 
229     @Override
230     FutureCompletionStage<V> whenCompleteAsync(BiConsumer<? super V, ? super Throwable> action);
231 
232     @Override
233     <U> FutureCompletionStage<U> handle(BiFunction<? super V, Throwable, ? extends U> fn);
234 
235     @Override
236     <U> FutureCompletionStage<U> handleAsync(BiFunction<? super V, Throwable, ? extends U> fn);
237 
238     @Override
239     <U> FutureCompletionStage<U> thenApplyAsync(Function<? super V, ? extends U> fn, Executor executor);
240 
241     @Override
242     FutureCompletionStage<Void> thenAcceptAsync(Consumer<? super V> action, Executor executor);
243 
244     @Override
245     FutureCompletionStage<Void> thenRunAsync(Runnable action, Executor executor);
246 
247     @Override
248     <U, V1> FutureCompletionStage<V1> thenCombineAsync(
249             CompletionStage<? extends U> other, BiFunction<? super V, ? super U, ? extends V1> fn, Executor executor);
250 
251     @Override
252     <U> FutureCompletionStage<Void> thenAcceptBothAsync(
253             CompletionStage<? extends U> other, BiConsumer<? super V, ? super U> action, Executor executor);
254 
255     @Override
256     FutureCompletionStage<Void> runAfterBothAsync(
257             CompletionStage<?> other, Runnable action, Executor executor);
258 
259     @Override
260     <U> FutureCompletionStage<U> applyToEitherAsync(
261             CompletionStage<? extends V> other, Function<? super V, U> fn, Executor executor);
262 
263     @Override
264     FutureCompletionStage<Void> acceptEitherAsync(
265             CompletionStage<? extends V> other, Consumer<? super V> action, Executor executor);
266 
267     @Override
268     FutureCompletionStage<Void> runAfterEitherAsync(
269             CompletionStage<?> other, Runnable action, Executor executor);
270 
271     @Override
272     <U> FutureCompletionStage<U> thenComposeAsync(
273             Function<? super V, ? extends CompletionStage<U>> fn, Executor executor);
274 
275     @Override
276     FutureCompletionStage<V> exceptionally(Function<Throwable, ? extends V> fn);
277 
278     @Override
279     FutureCompletionStage<V> whenCompleteAsync(BiConsumer<? super V, ? super Throwable> action, Executor executor);
280 
281     @Override
282     <U> FutureCompletionStage<U> handleAsync(BiFunction<? super V, Throwable, ? extends U> fn, Executor executor);
283 
284     
285 
286 
287 
288     static <U> FutureCompletionStage<U> toFutureCompletionStage(CompletionStage<U> stage, EventExecutor executor) {
289         Objects.requireNonNull(stage, "stage");
290         Objects.requireNonNull(executor, "executor");
291         if (stage instanceof FutureCompletionStage && ((FutureCompletionStage<?>) stage).executor() == executor) {
292             return (FutureCompletionStage<U>) stage;
293         }
294 
295         
296         if (stage instanceof CompletableFuture) {
297             CompletableFuture<U> future = (CompletableFuture<U>) stage;
298             if (future.isDone() && !future.isCompletedExceptionally()) {
299                 return executor.newSucceededFuture(future.getNow(null)).asStage();
300             }
301         }
302 
303         Promise<U> promise = executor.newPromise();
304         stage.whenComplete((v, cause) -> {
305             if (cause != null) {
306                 promise.setFailure(cause);
307             } else {
308                 promise.setSuccess(v);
309             }
310         });
311         return promise.asFuture().asStage();
312     }
313 }