1 /*
2 * Copyright 2021 The Netty Project
3 *
4 * The Netty Project licenses this file to you under the Apache License,
5 * version 2.0 (the "License"); you may not use this file except in compliance
6 * with the License. You may obtain a copy of the License at:
7 *
8 * https://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations
14 * under the License.
15 */
16 package io.netty5.util.concurrent;
17
18 import io.netty5.util.internal.logging.InternalLogger;
19 import io.netty5.util.internal.logging.InternalLoggerFactory;
20
21 import java.util.concurrent.Callable;
22 import java.util.function.Function;
23
24 import static io.netty5.util.internal.PromiseNotificationUtil.tryFailure;
25 import static java.util.Objects.requireNonNull;
26
27 /**
28 * Combinator operations on {@linkplain Future futures}.
29 * <p>
30 * Used for implementing {@link Future#map(Function)} and {@link Future#flatMap(Function)}
31 *
32 * @implNote The operations themselves are implemented as static inner classes instead of lambdas to aid debugging.
33 */
34 final class Futures {
35 private static final InternalLogger logger = InternalLoggerFactory.getInstance(Futures.class);
36 private static final PassThrough<?> PASS_THROUGH = new PassThrough<>();
37 private static final PropagateCancel PROPAGATE_CANCEL = new PropagateCancel();
38
39 /**
40 * Creates a new {@link Future} that will complete with the result of the given {@link Future} mapped through the
41 * given mapper function.
42 * <p>
43 * If the given future fails, then the returned future will fail as well, with the same exception. Cancellation of
44 * either future will cancel the other. If the mapper function throws, the returned future will fail, but the given
45 * future will be unaffected.
46 *
47 * @param future The future whose result will flow to the returned future, through the mapping function.
48 * @param mapper The function that will convert the result of the given future into the result of the returned
49 * future.
50 * @param <R> The result type of the mapper function, and of the returned future.
51 * @return A new future instance that will complete with the mapped result of the given future.
52 */
53 public static <V, R> Future<R> map(Future<V> future, Function<V, R> mapper) {
54 requireNonNull(future, "future");
55 requireNonNull(mapper, "mapper");
56 if (future.isFailed()) {
57 @SuppressWarnings("unchecked") // Cast is safe because the result type is not used in failed futures.
58 Future<R> failed = (Future<R>) future;
59 return failed;
60 }
61 if (future.isSuccess()) {
62 return future.executor().submit(new CallableMapper<>(future, mapper));
63 }
64 Promise<R> promise = future.executor().newPromise();
65 future.addListener(new Mapper<>(promise, mapper));
66 Future<R> mappedFuture = promise.asFuture();
67 mappedFuture.addListener(future, propagateCancel());
68 return mappedFuture;
69 }
70
71 /**
72 * Creates a new {@link Future} that will complete with the result of the given {@link Future} flat-mapped through
73 * the given mapper function.
74 * <p>
75 * The "flat" in "flat-map" means the given mapper function produces a result that itself is a future-of-R, yet this
76 * method also returns a future-of-R, rather than a future-of-future-of-R. In other words, if the same mapper
77 * function was used with the {@link #map(Future, Function)} method, you would get back a {@code Future<Future<R>>}.
78 * These nested futures are "flattened" into a {@code Future<R>} by this method.
79 * <p>
80 * Effectively, this method behaves similar to this serial code, except asynchronously and with proper exception and
81 * cancellation handling:
82 * <pre>{@code
83 * V x = future.sync().getNow();
84 * Future<R> y = mapper.apply(x);
85 * R result = y.sync().getNow();
86 * }</pre>
87 * <p>
88 * If the given future fails, then the returned future will fail as well, with the same exception. Cancellation of
89 * either future will cancel the other. If the mapper function throws, the returned future will fail, but the given
90 * future will be unaffected.
91 *
92 * @param mapper The function that will convert the result of the given future into the result of the returned
93 * future.
94 * @param <R> The result type of the mapper function, and of the returned future.
95 * @return A new future instance that will complete with the mapped result of the given future.
96 */
97 public static <V, R> Future<R> flatMap(Future<V> future, Function<V, Future<R>> mapper) {
98 requireNonNull(future, "future");
99 requireNonNull(mapper, "mapper");
100 Promise<R> promise = future.executor().newPromise();
101 future.addListener(new FlatMapper<>(promise, mapper));
102 Future<R> mappedFuture = promise.asFuture();
103 if (!future.isSuccess()) {
104 // Propagate cancellation if future is either incomplete or failed.
105 // Failed means it could be cancelled, so that needs to be propagated.
106 mappedFuture.addListener(future, propagateCancel());
107 }
108 return mappedFuture;
109 }
110
111 @SuppressWarnings("unchecked")
112 static FutureContextListener<Future<?>, Object> propagateCancel() {
113 return (FutureContextListener<Future<?>, Object>) (FutureContextListener<?, ?>) PROPAGATE_CANCEL;
114 }
115
116 @SuppressWarnings("unchecked")
117 static <R> FutureContextListener<Promise<R>, Object> passThrough() {
118 return (FutureContextListener<Promise<R>, Object>) (FutureContextListener<?, ?>) PASS_THROUGH;
119 }
120
121 static <A, B> void propagateUncommonCompletion(Future<? extends A> completed, Promise<B> recipient) {
122 if (completed.isCancelled()) {
123 // Don't check or log if cancellation propagation fails.
124 // Propagation goes both ways, which means at least one future will already be cancelled here.
125 recipient.cancel();
126 } else {
127 Throwable cause = completed.cause();
128 recipient.tryFailure(cause);
129 }
130 }
131
132 private Futures() {
133 }
134
135 private static final class PropagateCancel implements FutureContextListener<Future<Object>, Object> {
136 @Override
137 public void operationComplete(Future<Object> context, Future<?> future) throws Exception {
138 if (future.isCancelled()) {
139 context.cancel();
140 }
141 }
142 }
143
144 private static final class PassThrough<R> implements FutureContextListener<Promise<R>, Object> {
145 @Override
146 public void operationComplete(Promise<R> recipient, Future<?> completed) throws Exception {
147 if (completed.isSuccess()) {
148 try {
149 @SuppressWarnings("unchecked")
150 R result = (R) completed.getNow();
151 recipient.trySuccess(result);
152 } catch (Throwable e) {
153 tryFailure(recipient, e, logger);
154 }
155 } else {
156 propagateUncommonCompletion(completed, recipient);
157 }
158 }
159 }
160
161 private static final class CallableMapper<R, T> implements Callable<R> {
162 private final Future<T> future;
163 private final Function<T, R> mapper;
164
165 CallableMapper(Future<T> future, Function<T, R> mapper) {
166 this.future = future;
167 this.mapper = mapper;
168 }
169
170 @Override
171 public R call() throws Exception {
172 return mapper.apply(future.getNow());
173 }
174 }
175
176 private static final class Mapper<R, T> implements FutureListener<Object> {
177 private final Promise<R> recipient;
178 private final Function<T, R> mapper;
179
180 Mapper(Promise<R> recipient, Function<T, R> mapper) {
181 this.recipient = recipient;
182 this.mapper = mapper;
183 }
184
185 @Override
186 public void operationComplete(Future<?> completed) throws Exception {
187 if (completed.isSuccess()) {
188 try {
189 @SuppressWarnings("unchecked")
190 T result = (T) completed.getNow();
191 R mapped = mapper.apply(result);
192 recipient.trySuccess(mapped);
193 } catch (Throwable e) {
194 tryFailure(recipient, e, logger);
195 }
196 } else {
197 propagateUncommonCompletion(completed, recipient);
198 }
199 }
200 }
201
202 private static final class FlatMapper<R, T> implements FutureListener<Object> {
203 private final Promise<R> recipient;
204 private final Function<T, Future<R>> mapper;
205
206 FlatMapper(Promise<R> recipient, Function<T, Future<R>> mapper) {
207 this.recipient = recipient;
208 this.mapper = mapper;
209 }
210
211 @Override
212 public void operationComplete(Future<?> completed) throws Exception {
213 if (completed.isSuccess()) {
214 try {
215 @SuppressWarnings("unchecked")
216 T result = (T) completed.getNow();
217 Future<R> future = mapper.apply(result);
218 if (future.isSuccess()) {
219 recipient.trySuccess(future.getNow());
220 } else if (future.isFailed()) {
221 propagateUncommonCompletion(future, recipient);
222 } else {
223 future.addListener(recipient, passThrough());
224 recipient.asFuture().addListener(future, propagateCancel());
225 }
226 } catch (Throwable e) {
227 tryFailure(recipient, e, logger);
228 }
229 } else {
230 propagateUncommonCompletion(completed, recipient);
231 }
232 }
233 }
234
235 /**
236 * Link the {@link Future} and {@link Promise} such that if the {@link Future} completes the {@link Promise}
237 * will be notified. Cancellation is propagated both ways such that if the {@link Future} is cancelled
238 * the {@link Promise} is cancelled and vice-versa.
239 *
240 * @param future the {@link Future} which will be used to listen to for notifying the {@link Promise}.
241 * @param promise the {@link Promise} which will be notified
242 * @param <V> the type of the value.
243 */
244 static <V> void cascade(final Future<V> future, final Promise<? super V> promise) {
245 requireNonNull(future, "future");
246 requireNonNull(promise, "promise");
247
248 if (!future.isSuccess()) {
249 // Propagate cancellation if future is either incomplete or failed.
250 // Failed means it could be cancelled, so that needs to be propagated.
251 promise.asFuture().addListener(future, propagateCancel());
252 }
253 future.addListener(promise, passThrough());
254 }
255 }