View Javadoc
1   /*
2    * Copyright 2019 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty5.util.concurrent;
17  
18  import io.netty5.util.internal.StringUtil;
19  
20  import java.util.Objects;
21  import java.util.concurrent.CancellationException;
22  import java.util.concurrent.CompletableFuture;
23  import java.util.concurrent.CompletionException;
24  import java.util.concurrent.CompletionStage;
25  import java.util.concurrent.Executor;
26  import java.util.concurrent.TimeUnit;
27  import java.util.function.BiConsumer;
28  import java.util.function.BiFunction;
29  import java.util.function.Consumer;
30  import java.util.function.Function;
31  
32  /**
33   * A {@link CompletionStage} that provides the same threading semantics and guarantees as the underlying
34   * {@link Future}, which means that all the callbacks will be executed by {@link #executor()}
35   * if not specified otherwise (by calling the corresponding *Async methods).
36   * <p>
37   * This interface also extends {@link java.util.concurrent.Future}, to provide blocking methods for awaiting the result
38   * of the future.
39   * This is in contrast to the Netty {@link Future}, which is entirely non-blocking.
40   * <p>
41   * Please be aware that {@link FutureCompletionStage#toCompletableFuture()} is not supported and so will throw
42   * an {@link UnsupportedOperationException} when invoked.
43   *
44   * @param <V> the value type.
45   */
46  public interface FutureCompletionStage<V>
47          extends CompletionStage<V>, java.util.concurrent.Future<V>, AsynchronousResult<V> {
48  
49      /**
50       * Waits for this future until it is done, and rethrows the cause of the failure if this future failed.
51       *
52       * @throws CancellationException if the computation was cancelled
53       * @throws CompletionException   if the computation threw an exception.
54       * @throws InterruptedException  if the current thread was interrupted while waiting
55       */
56      FutureCompletionStage<V> sync() throws InterruptedException;
57  
58      /**
59       * Waits for the future to complete, then calls the given result handler with the outcome.
60       * <p>
61       * If the future completes successfully, then the result handler is called with the result of the future -
62       * which may be {@code null} - and a {@code null} exception.
63       * <p>
64       * If the future fails, then the result handler is called with a {@code null} result, and a non-{@code null}
65       * exception.
66       * <p>
67       * Success or failure of the future can be determined on whether the exception is {@code null} or not.
68       * <p>
69       * The result handler may compute a new result, which will be the return value of the {@code join} call.
70       *
71       * @param resultHandler The function that will process the result of the completed future.
72       * @return The result of the {@code resultHandler} computation.
73       * @param <T> The return type of the {@code resultHandler}.
74       * @throws InterruptedException if the thread is interrupted while waiting for the future to complete.
75       */
76      default <T> T join(BiFunction<V, Throwable, T> resultHandler) throws InterruptedException {
77          Objects.requireNonNull(resultHandler, "resultHandler");
78          await();
79          var fut = future();
80          if (fut.isSuccess()) {
81              return resultHandler.apply(fut.getNow(), null);
82          } else {
83              return resultHandler.apply(null, fut.cause());
84          }
85      }
86  
87      /**
88       * Waits for this future to be completed.
89       *
90       * @throws InterruptedException if the current thread was interrupted
91       */
92      FutureCompletionStage<V> await() throws InterruptedException;
93  
94      /**
95       * Waits for this future to be completed within the specified time limit.
96       *
97       * @return {@code true} if and only if the future was completed within the specified time limit
98       * @throws InterruptedException if the current thread was interrupted
99       */
100     boolean await(long timeout, TimeUnit unit) throws InterruptedException;
101 
102     /**
103      * Wait for the future to complete, and return the cause if it failed, or {@code null} if it succeeded.
104      *
105      * @return The exception that caused the future to fail, if any, or {@code null}.
106      * @throws InterruptedException if the current thread was interrupted while waiting.
107      */
108     default Throwable getCause() throws InterruptedException {
109         await();
110         return cause();
111     }
112 
113     /**
114      * Returns the underlying {@link Future} of this {@link FutureCompletionStage}.
115      */
116     Future<V> future();
117 
118     @Override
119     default boolean cancel() {
120         return future().cancel();
121     }
122 
123     @Override
124     default boolean isSuccess() {
125         return future().isSuccess();
126     }
127 
128     @Override
129     default boolean isFailed() {
130         return future().isFailed();
131     }
132 
133     @Override
134     default boolean isCancellable() {
135         return future().isCancellable();
136     }
137 
138     @Override
139     default V getNow() {
140         return future().getNow();
141     }
142 
143     @Override
144     default Throwable cause() {
145         return future().cause();
146     }
147 
148     @Override
149     default EventExecutor executor() {
150         return future().executor();
151     }
152 
153     /**
154      * Not supported and so throws an {@link UnsupportedOperationException}.
155      */
156     @Override
157     default CompletableFuture<V> toCompletableFuture() {
158         throw new UnsupportedOperationException("Not supported by "
159                 + StringUtil.simpleClassName(FutureCompletionStage.class));
160     }
161 
162     @Override
163     <U> FutureCompletionStage<U> thenApply(Function<? super V, ? extends U> fn);
164 
165     @Override
166     <U> FutureCompletionStage<U> thenApplyAsync(Function<? super V, ? extends U> fn);
167 
168     @Override
169     FutureCompletionStage<Void> thenAccept(Consumer<? super V> action);
170 
171     @Override
172     FutureCompletionStage<Void> thenAcceptAsync(Consumer<? super V> action);
173 
174     @Override
175     FutureCompletionStage<Void> thenRun(Runnable action);
176 
177     @Override
178     FutureCompletionStage<Void> thenRunAsync(Runnable action);
179 
180     @Override
181     <U, V1> FutureCompletionStage<V1> thenCombine(
182             CompletionStage<? extends U> other, BiFunction<? super V, ? super U, ? extends V1> fn);
183 
184     @Override
185     <U, V1> FutureCompletionStage<V1> thenCombineAsync(
186             CompletionStage<? extends U> other, BiFunction<? super V, ? super U, ? extends V1> fn);
187 
188     @Override
189     <U> FutureCompletionStage<Void> thenAcceptBoth(
190             CompletionStage<? extends U> other, BiConsumer<? super V, ? super U> action);
191 
192     @Override
193     <U> FutureCompletionStage<Void> thenAcceptBothAsync(
194             CompletionStage<? extends U> other, BiConsumer<? super V, ? super U> action);
195 
196     @Override
197     FutureCompletionStage<Void> runAfterBoth(CompletionStage<?> other, Runnable action);
198 
199     @Override
200     FutureCompletionStage<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action);
201 
202     @Override
203     <U> FutureCompletionStage<U> applyToEither(CompletionStage<? extends V> other, Function<? super V, U> fn);
204 
205     @Override
206     <U> FutureCompletionStage<U> applyToEitherAsync(CompletionStage<? extends V> other, Function<? super V, U> fn);
207 
208     @Override
209     FutureCompletionStage<Void> acceptEither(CompletionStage<? extends V> other, Consumer<? super V> action);
210 
211     @Override
212     FutureCompletionStage<Void> acceptEitherAsync(CompletionStage<? extends V> other, Consumer<? super V> action);
213 
214     @Override
215     FutureCompletionStage<Void> runAfterEither(CompletionStage<?> other, Runnable action);
216 
217     @Override
218     FutureCompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action);
219 
220     @Override
221     <U> FutureCompletionStage<U> thenCompose(Function<? super V, ? extends CompletionStage<U>> fn);
222 
223     @Override
224     <U> FutureCompletionStage<U> thenComposeAsync(Function<? super V, ? extends CompletionStage<U>> fn);
225 
226     @Override
227     FutureCompletionStage<V> whenComplete(BiConsumer<? super V, ? super Throwable> action);
228 
229     @Override
230     FutureCompletionStage<V> whenCompleteAsync(BiConsumer<? super V, ? super Throwable> action);
231 
232     @Override
233     <U> FutureCompletionStage<U> handle(BiFunction<? super V, Throwable, ? extends U> fn);
234 
235     @Override
236     <U> FutureCompletionStage<U> handleAsync(BiFunction<? super V, Throwable, ? extends U> fn);
237 
238     @Override
239     <U> FutureCompletionStage<U> thenApplyAsync(Function<? super V, ? extends U> fn, Executor executor);
240 
241     @Override
242     FutureCompletionStage<Void> thenAcceptAsync(Consumer<? super V> action, Executor executor);
243 
244     @Override
245     FutureCompletionStage<Void> thenRunAsync(Runnable action, Executor executor);
246 
247     @Override
248     <U, V1> FutureCompletionStage<V1> thenCombineAsync(
249             CompletionStage<? extends U> other, BiFunction<? super V, ? super U, ? extends V1> fn, Executor executor);
250 
251     @Override
252     <U> FutureCompletionStage<Void> thenAcceptBothAsync(
253             CompletionStage<? extends U> other, BiConsumer<? super V, ? super U> action, Executor executor);
254 
255     @Override
256     FutureCompletionStage<Void> runAfterBothAsync(
257             CompletionStage<?> other, Runnable action, Executor executor);
258 
259     @Override
260     <U> FutureCompletionStage<U> applyToEitherAsync(
261             CompletionStage<? extends V> other, Function<? super V, U> fn, Executor executor);
262 
263     @Override
264     FutureCompletionStage<Void> acceptEitherAsync(
265             CompletionStage<? extends V> other, Consumer<? super V> action, Executor executor);
266 
267     @Override
268     FutureCompletionStage<Void> runAfterEitherAsync(
269             CompletionStage<?> other, Runnable action, Executor executor);
270 
271     @Override
272     <U> FutureCompletionStage<U> thenComposeAsync(
273             Function<? super V, ? extends CompletionStage<U>> fn, Executor executor);
274 
275     @Override
276     FutureCompletionStage<V> exceptionally(Function<Throwable, ? extends V> fn);
277 
278     @Override
279     FutureCompletionStage<V> whenCompleteAsync(BiConsumer<? super V, ? super Throwable> action, Executor executor);
280 
281     @Override
282     <U> FutureCompletionStage<U> handleAsync(BiFunction<? super V, Throwable, ? extends U> fn, Executor executor);
283 
284     /**
285      * Returns a {@link FutureCompletionStage} for the given {@link CompletionStage}
286      * that is pinned to the given {@link EventExecutor}.
287      */
288     static <U> FutureCompletionStage<U> toFutureCompletionStage(CompletionStage<U> stage, EventExecutor executor) {
289         Objects.requireNonNull(stage, "stage");
290         Objects.requireNonNull(executor, "executor");
291         if (stage instanceof FutureCompletionStage && ((FutureCompletionStage<?>) stage).executor() == executor) {
292             return (FutureCompletionStage<U>) stage;
293         }
294 
295         // Try fast-path for CompletableFuture instances that are already complete to reduce object creation.
296         if (stage instanceof CompletableFuture) {
297             CompletableFuture<U> future = (CompletableFuture<U>) stage;
298             if (future.isDone() && !future.isCompletedExceptionally()) {
299                 return executor.newSucceededFuture(future.getNow(null)).asStage();
300             }
301         }
302 
303         Promise<U> promise = executor.newPromise();
304         stage.whenComplete((v, cause) -> {
305             if (cause != null) {
306                 promise.setFailure(cause);
307             } else {
308                 promise.setSuccess(v);
309             }
310         });
311         return promise.asFuture().asStage();
312     }
313 }