1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty5.util.concurrent;
17
18 import io.netty5.util.internal.logging.InternalLogger;
19 import io.netty5.util.internal.logging.InternalLoggerFactory;
20
21 import java.util.concurrent.Callable;
22 import java.util.function.Function;
23
24 import static io.netty5.util.internal.PromiseNotificationUtil.tryFailure;
25 import static java.util.Objects.requireNonNull;
26
27
28
29
30
31
32
33
34 final class Futures {
35 private static final InternalLogger logger = InternalLoggerFactory.getInstance(Futures.class);
36 private static final PassThrough<?> PASS_THROUGH = new PassThrough<>();
37 private static final PropagateCancel PROPAGATE_CANCEL = new PropagateCancel();
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53 public static <V, R> Future<R> map(Future<V> future, Function<V, R> mapper) {
54 requireNonNull(future, "future");
55 requireNonNull(mapper, "mapper");
56 if (future.isFailed()) {
57 @SuppressWarnings("unchecked")
58 Future<R> failed = (Future<R>) future;
59 return failed;
60 }
61 if (future.isSuccess()) {
62 return future.executor().submit(new CallableMapper<>(future, mapper));
63 }
64 Promise<R> promise = future.executor().newPromise();
65 future.addListener(new Mapper<>(promise, mapper));
66 Future<R> mappedFuture = promise.asFuture();
67 mappedFuture.addListener(future, propagateCancel());
68 return mappedFuture;
69 }
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97 public static <V, R> Future<R> flatMap(Future<V> future, Function<V, Future<R>> mapper) {
98 requireNonNull(future, "future");
99 requireNonNull(mapper, "mapper");
100 Promise<R> promise = future.executor().newPromise();
101 future.addListener(new FlatMapper<>(promise, mapper));
102 Future<R> mappedFuture = promise.asFuture();
103 if (!future.isSuccess()) {
104
105
106 mappedFuture.addListener(future, propagateCancel());
107 }
108 return mappedFuture;
109 }
110
111 @SuppressWarnings("unchecked")
112 static FutureContextListener<Future<?>, Object> propagateCancel() {
113 return (FutureContextListener<Future<?>, Object>) (FutureContextListener<?, ?>) PROPAGATE_CANCEL;
114 }
115
116 @SuppressWarnings("unchecked")
117 static <R> FutureContextListener<Promise<R>, Object> passThrough() {
118 return (FutureContextListener<Promise<R>, Object>) (FutureContextListener<?, ?>) PASS_THROUGH;
119 }
120
121 static <A, B> void propagateUncommonCompletion(Future<? extends A> completed, Promise<B> recipient) {
122 if (completed.isCancelled()) {
123
124
125 recipient.cancel();
126 } else {
127 Throwable cause = completed.cause();
128 recipient.tryFailure(cause);
129 }
130 }
131
132 private Futures() {
133 }
134
135 private static final class PropagateCancel implements FutureContextListener<Future<Object>, Object> {
136 @Override
137 public void operationComplete(Future<Object> context, Future<?> future) throws Exception {
138 if (future.isCancelled()) {
139 context.cancel();
140 }
141 }
142 }
143
144 private static final class PassThrough<R> implements FutureContextListener<Promise<R>, Object> {
145 @Override
146 public void operationComplete(Promise<R> recipient, Future<?> completed) throws Exception {
147 if (completed.isSuccess()) {
148 try {
149 @SuppressWarnings("unchecked")
150 R result = (R) completed.getNow();
151 recipient.trySuccess(result);
152 } catch (Throwable e) {
153 tryFailure(recipient, e, logger);
154 }
155 } else {
156 propagateUncommonCompletion(completed, recipient);
157 }
158 }
159 }
160
161 private static final class CallableMapper<R, T> implements Callable<R> {
162 private final Future<T> future;
163 private final Function<T, R> mapper;
164
165 CallableMapper(Future<T> future, Function<T, R> mapper) {
166 this.future = future;
167 this.mapper = mapper;
168 }
169
170 @Override
171 public R call() throws Exception {
172 return mapper.apply(future.getNow());
173 }
174 }
175
176 private static final class Mapper<R, T> implements FutureListener<Object> {
177 private final Promise<R> recipient;
178 private final Function<T, R> mapper;
179
180 Mapper(Promise<R> recipient, Function<T, R> mapper) {
181 this.recipient = recipient;
182 this.mapper = mapper;
183 }
184
185 @Override
186 public void operationComplete(Future<?> completed) throws Exception {
187 if (completed.isSuccess()) {
188 try {
189 @SuppressWarnings("unchecked")
190 T result = (T) completed.getNow();
191 R mapped = mapper.apply(result);
192 recipient.trySuccess(mapped);
193 } catch (Throwable e) {
194 tryFailure(recipient, e, logger);
195 }
196 } else {
197 propagateUncommonCompletion(completed, recipient);
198 }
199 }
200 }
201
202 private static final class FlatMapper<R, T> implements FutureListener<Object> {
203 private final Promise<R> recipient;
204 private final Function<T, Future<R>> mapper;
205
206 FlatMapper(Promise<R> recipient, Function<T, Future<R>> mapper) {
207 this.recipient = recipient;
208 this.mapper = mapper;
209 }
210
211 @Override
212 public void operationComplete(Future<?> completed) throws Exception {
213 if (completed.isSuccess()) {
214 try {
215 @SuppressWarnings("unchecked")
216 T result = (T) completed.getNow();
217 Future<R> future = mapper.apply(result);
218 if (future.isSuccess()) {
219 recipient.trySuccess(future.getNow());
220 } else if (future.isFailed()) {
221 propagateUncommonCompletion(future, recipient);
222 } else {
223 future.addListener(recipient, passThrough());
224 recipient.asFuture().addListener(future, propagateCancel());
225 }
226 } catch (Throwable e) {
227 tryFailure(recipient, e, logger);
228 }
229 } else {
230 propagateUncommonCompletion(completed, recipient);
231 }
232 }
233 }
234
235
236
237
238
239
240
241
242
243
244 static <V> void cascade(final Future<V> future, final Promise<? super V> promise) {
245 requireNonNull(future, "future");
246 requireNonNull(promise, "promise");
247
248 if (!future.isSuccess()) {
249
250
251 promise.asFuture().addListener(future, propagateCancel());
252 }
253 future.addListener(promise, passThrough());
254 }
255 }