View Javadoc
1   /*
2    * Copyright 2021 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty5.util.concurrent;
17  
18  import io.netty5.util.internal.logging.InternalLogger;
19  import io.netty5.util.internal.logging.InternalLoggerFactory;
20  
21  import java.util.concurrent.Callable;
22  import java.util.function.Function;
23  
24  import static io.netty5.util.internal.PromiseNotificationUtil.tryFailure;
25  import static java.util.Objects.requireNonNull;
26  
27  /**
28   * Combinator operations on {@linkplain Future futures}.
29   * <p>
30   * Used for implementing {@link Future#map(Function)} and {@link Future#flatMap(Function)}
31   *
32   * @implNote The operations themselves are implemented as static inner classes instead of lambdas to aid debugging.
33   */
34  final class Futures {
35      private static final InternalLogger logger = InternalLoggerFactory.getInstance(Futures.class);
36      private static final PassThrough<?> PASS_THROUGH = new PassThrough<>();
37      private static final PropagateCancel PROPAGATE_CANCEL = new PropagateCancel();
38  
39      /**
40       * Creates a new {@link Future} that will complete with the result of the given {@link Future} mapped through the
41       * given mapper function.
42       * <p>
43       * If the given future fails, then the returned future will fail as well, with the same exception. Cancellation of
44       * either future will cancel the other. If the mapper function throws, the returned future will fail, but the given
45       * future will be unaffected.
46       *
47       * @param future The future whose result will flow to the returned future, through the mapping function.
48       * @param mapper The function that will convert the result of the given future into the result of the returned
49       *               future.
50       * @param <R>    The result type of the mapper function, and of the returned future.
51       * @return A new future instance that will complete with the mapped result of the given future.
52       */
53      public static <V, R> Future<R> map(Future<V> future, Function<V, R> mapper) {
54          requireNonNull(future, "future");
55          requireNonNull(mapper, "mapper");
56          if (future.isFailed()) {
57              @SuppressWarnings("unchecked") // Cast is safe because the result type is not used in failed futures.
58              Future<R> failed = (Future<R>) future;
59              return failed;
60          }
61          if (future.isSuccess()) {
62              return future.executor().submit(new CallableMapper<>(future, mapper));
63          }
64          Promise<R> promise = future.executor().newPromise();
65          future.addListener(new Mapper<>(promise, mapper));
66          Future<R> mappedFuture = promise.asFuture();
67          mappedFuture.addListener(future, propagateCancel());
68          return mappedFuture;
69      }
70  
71      /**
72       * Creates a new {@link Future} that will complete with the result of the given {@link Future} flat-mapped through
73       * the given mapper function.
74       * <p>
75       * The "flat" in "flat-map" means the given mapper function produces a result that itself is a future-of-R, yet this
76       * method also returns a future-of-R, rather than a future-of-future-of-R. In other words, if the same mapper
77       * function was used with the {@link #map(Future, Function)} method, you would get back a {@code Future<Future<R>>}.
78       * These nested futures are "flattened" into a {@code Future<R>} by this method.
79       * <p>
80       * Effectively, this method behaves similar to this serial code, except asynchronously and with proper exception and
81       * cancellation handling:
82       * <pre>{@code
83       * V x = future.sync().getNow();
84       * Future<R> y = mapper.apply(x);
85       * R result = y.sync().getNow();
86       * }</pre>
87       * <p>
88       * If the given future fails, then the returned future will fail as well, with the same exception. Cancellation of
89       * either future will cancel the other. If the mapper function throws, the returned future will fail, but the given
90       * future will be unaffected.
91       *
92       * @param mapper The function that will convert the result of the given future into the result of the returned
93       *               future.
94       * @param <R>    The result type of the mapper function, and of the returned future.
95       * @return A new future instance that will complete with the mapped result of the given future.
96       */
97      public static <V, R> Future<R> flatMap(Future<V> future, Function<V, Future<R>> mapper) {
98          requireNonNull(future, "future");
99          requireNonNull(mapper, "mapper");
100         Promise<R> promise = future.executor().newPromise();
101         future.addListener(new FlatMapper<>(promise, mapper));
102         Future<R> mappedFuture = promise.asFuture();
103         if (!future.isSuccess()) {
104             // Propagate cancellation if future is either incomplete or failed.
105             // Failed means it could be cancelled, so that needs to be propagated.
106             mappedFuture.addListener(future, propagateCancel());
107         }
108         return mappedFuture;
109     }
110 
111     @SuppressWarnings("unchecked")
112     static FutureContextListener<Future<?>, Object> propagateCancel() {
113         return (FutureContextListener<Future<?>, Object>) (FutureContextListener<?, ?>) PROPAGATE_CANCEL;
114     }
115 
116     @SuppressWarnings("unchecked")
117     static <R> FutureContextListener<Promise<R>, Object> passThrough() {
118         return (FutureContextListener<Promise<R>, Object>) (FutureContextListener<?, ?>) PASS_THROUGH;
119     }
120 
121     static <A, B> void propagateUncommonCompletion(Future<? extends A> completed, Promise<B> recipient) {
122         if (completed.isCancelled()) {
123             // Don't check or log if cancellation propagation fails.
124             // Propagation goes both ways, which means at least one future will already be cancelled here.
125             recipient.cancel();
126         } else {
127             Throwable cause = completed.cause();
128             recipient.tryFailure(cause);
129         }
130     }
131 
132     private Futures() {
133     }
134 
135     private static final class PropagateCancel implements FutureContextListener<Future<Object>, Object> {
136         @Override
137         public void operationComplete(Future<Object> context, Future<?> future) throws Exception {
138             if (future.isCancelled()) {
139                 context.cancel();
140             }
141         }
142     }
143 
144     private static final class PassThrough<R> implements FutureContextListener<Promise<R>, Object> {
145         @Override
146         public void operationComplete(Promise<R> recipient, Future<?> completed) throws Exception {
147             if (completed.isSuccess()) {
148                 try {
149                     @SuppressWarnings("unchecked")
150                     R result = (R) completed.getNow();
151                     recipient.trySuccess(result);
152                 } catch (Throwable e) {
153                     tryFailure(recipient, e, logger);
154                 }
155             } else {
156                 propagateUncommonCompletion(completed, recipient);
157             }
158         }
159     }
160 
161     private static final class CallableMapper<R, T> implements Callable<R> {
162         private final Future<T> future;
163         private final Function<T, R> mapper;
164 
165         CallableMapper(Future<T> future, Function<T, R> mapper) {
166             this.future = future;
167             this.mapper = mapper;
168         }
169 
170         @Override
171         public R call() throws Exception {
172             return mapper.apply(future.getNow());
173         }
174     }
175 
176     private static final class Mapper<R, T> implements FutureListener<Object> {
177         private final Promise<R> recipient;
178         private final Function<T, R> mapper;
179 
180         Mapper(Promise<R> recipient, Function<T, R> mapper) {
181             this.recipient = recipient;
182             this.mapper = mapper;
183         }
184 
185         @Override
186         public void operationComplete(Future<?> completed) throws Exception {
187             if (completed.isSuccess()) {
188                 try {
189                     @SuppressWarnings("unchecked")
190                     T result = (T) completed.getNow();
191                     R mapped = mapper.apply(result);
192                     recipient.trySuccess(mapped);
193                 } catch (Throwable e) {
194                     tryFailure(recipient, e, logger);
195                 }
196             } else {
197                 propagateUncommonCompletion(completed, recipient);
198             }
199         }
200     }
201 
202     private static final class FlatMapper<R, T> implements FutureListener<Object> {
203         private final Promise<R> recipient;
204         private final Function<T, Future<R>> mapper;
205 
206         FlatMapper(Promise<R> recipient, Function<T, Future<R>> mapper) {
207             this.recipient = recipient;
208             this.mapper = mapper;
209         }
210 
211         @Override
212         public void operationComplete(Future<?> completed) throws Exception {
213             if (completed.isSuccess()) {
214                 try {
215                     @SuppressWarnings("unchecked")
216                     T result = (T) completed.getNow();
217                     Future<R> future = mapper.apply(result);
218                     if (future.isSuccess()) {
219                         recipient.trySuccess(future.getNow());
220                     } else if (future.isFailed()) {
221                         propagateUncommonCompletion(future, recipient);
222                     } else {
223                         future.addListener(recipient, passThrough());
224                         recipient.asFuture().addListener(future, propagateCancel());
225                     }
226                 } catch (Throwable e) {
227                     tryFailure(recipient, e, logger);
228                 }
229             } else {
230                 propagateUncommonCompletion(completed, recipient);
231             }
232         }
233     }
234 
235     /**
236      * Link the {@link Future} and {@link Promise} such that if the {@link Future} completes the {@link Promise}
237      * will be notified. Cancellation is propagated both ways such that if the {@link Future} is cancelled
238      * the {@link Promise} is cancelled and vice-versa.
239      *
240      * @param future            the {@link Future} which will be used to listen to for notifying the {@link Promise}.
241      * @param promise           the {@link Promise} which will be notified
242      * @param <V>               the type of the value.
243      */
244     static <V> void cascade(final Future<V> future, final Promise<? super V> promise) {
245         requireNonNull(future, "future");
246         requireNonNull(promise, "promise");
247 
248         if (!future.isSuccess()) {
249             // Propagate cancellation if future is either incomplete or failed.
250             // Failed means it could be cancelled, so that needs to be propagated.
251             promise.asFuture().addListener(future, propagateCancel());
252         }
253         future.addListener(promise, passThrough());
254     }
255 }