1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty5.util.concurrent;
17
18 import io.netty5.util.internal.StringUtil;
19 import io.netty5.util.internal.ThrowableUtil;
20 import io.netty5.util.internal.logging.InternalLogger;
21 import io.netty5.util.internal.logging.InternalLoggerFactory;
22
23 import java.util.concurrent.CancellationException;
24 import java.util.concurrent.CompletionException;
25 import java.util.concurrent.CompletionStage;
26 import java.util.concurrent.ExecutionException;
27 import java.util.concurrent.Executor;
28 import java.util.concurrent.ForkJoinPool;
29 import java.util.concurrent.TimeUnit;
30 import java.util.concurrent.TimeoutException;
31 import java.util.concurrent.atomic.AtomicReference;
32 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
33 import java.util.function.BiConsumer;
34 import java.util.function.BiFunction;
35 import java.util.function.Consumer;
36 import java.util.function.Function;
37
38 import static java.util.Objects.requireNonNull;
39
40 public class DefaultPromise<V> implements Promise<V>, Future<V>,
41 FutureCompletionStage<V>, java.util.concurrent.Future<V> {
42 private static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultPromise.class);
43 private static final InternalLogger rejectedExecutionLogger =
44 InternalLoggerFactory.getInstance(DefaultPromise.class.getName() + ".rejectedExecution");
45 @SuppressWarnings("rawtypes")
46 private static final AtomicReferenceFieldUpdater<DefaultPromise, Object> RESULT_UPDATER =
47 AtomicReferenceFieldUpdater.newUpdater(DefaultPromise.class, Object.class, "result");
48 private static final Object SUCCESS = new Object();
49 private static final Object UNCANCELLABLE = new Object();
50 private static final CauseHolder CANCELLATION_CAUSE_HOLDER = new CauseHolder(
51 StacklessCancellationException.newInstance(DefaultPromise.class, "cancel(...)"));
52 private static final StackTraceElement[] CANCELLATION_STACK = CANCELLATION_CAUSE_HOLDER.cause.getStackTrace();
53 static final Object NULL_CONTEXT = new Object();
54
55 private volatile Object result;
56 private final EventExecutor executor;
57
58
59
60
61
62
63
64
65
66
67 private Object listeners;
68
69
70
71 private short waiters;
72
73
74
75
76
77
78
79
80
81
82
83
84 protected DefaultPromise(EventExecutor executor) {
85 this.executor = requireNonNull(executor, "executor");
86 }
87
88
89
90
91
92
93
94
95
96
97
98 static <V> Promise<V> newSuccessfulPromise(EventExecutor executor, V result) {
99 return new DefaultPromise<>(executor, result);
100 }
101
102
103
104
105
106
107
108
109
110
111
112 static <V> Promise<V> newFailedPromise(EventExecutor executor, Throwable cause) {
113 return new DefaultPromise<>(cause, executor);
114 }
115
116 private DefaultPromise(EventExecutor executor, Object result) {
117 this.executor = requireNonNull(executor, "executor");
118 this.result = result == null? SUCCESS : result;
119 }
120
121 private DefaultPromise(Throwable cause, EventExecutor executor) {
122 this.executor = requireNonNull(executor, "executor");
123 result = new CauseHolder(requireNonNull(cause, "cause"));
124 }
125
126 @Override
127 public Promise<V> setSuccess(V result) {
128 if (setSuccess0(result)) {
129 return this;
130 }
131 throw new IllegalStateException("complete already: " + this);
132 }
133
134 @Override
135 public boolean trySuccess(V result) {
136 return setSuccess0(result);
137 }
138
139 @Override
140 public Promise<V> setFailure(Throwable cause) {
141 if (setFailure0(cause)) {
142 return this;
143 }
144 throw new IllegalStateException("complete already: " + this, cause);
145 }
146
147 @Override
148 public boolean tryFailure(Throwable cause) {
149 return setFailure0(cause);
150 }
151
152 @Override
153 public boolean setUncancellable() {
154 return RESULT_UPDATER.compareAndSet(this, null, UNCANCELLABLE);
155 }
156
157 @Override
158 public Future<V> asFuture() {
159 return this;
160 }
161
162 @Override
163 public boolean isSuccess() {
164 Object result = this.result;
165 return result != null && result != UNCANCELLABLE && !(result instanceof CauseHolder);
166 }
167
168 @Override
169 public boolean isFailed() {
170 return result instanceof CauseHolder;
171 }
172
173 @Override
174 public boolean isCancellable() {
175 return result == null;
176 }
177
178 private static final class LeanCancellationException extends CancellationException {
179 private static final long serialVersionUID = 2794674970981187807L;
180
181
182 @Override
183 public Throwable fillInStackTrace() {
184 setStackTrace(CANCELLATION_STACK);
185 return this;
186 }
187
188 @Override
189 public String toString() {
190 return CancellationException.class.getName();
191 }
192 }
193
194 @Override
195 public Throwable cause() {
196 return cause0(result);
197 }
198
199 private Throwable cause0(Object result) {
200 if (!isDone0(result)) {
201 throw new IllegalStateException("Cannot call cause() on a future that has not completed.");
202 }
203 if (!(result instanceof CauseHolder)) {
204 return null;
205 }
206 if (result == CANCELLATION_CAUSE_HOLDER) {
207 CancellationException ce = new LeanCancellationException();
208 if (RESULT_UPDATER.compareAndSet(this, CANCELLATION_CAUSE_HOLDER, new CauseHolder(ce))) {
209 return ce;
210 }
211 result = this.result;
212 }
213 return ((CauseHolder) result).cause;
214 }
215
216 @Override
217 public Future<V> addListener(FutureListener<? super V> listener) {
218 requireNonNull(listener, "listener");
219
220 addListener0(listener, null);
221 if (isDone()) {
222 notifyListeners();
223 }
224
225 return this;
226 }
227
228 @Override
229 public <C> Future<V> addListener(C context, FutureContextListener<? super C, ? super V> listener) {
230 requireNonNull(listener, "listener");
231
232 addListener0(listener, context == null ? NULL_CONTEXT : context);
233 if (isDone()) {
234 notifyListeners();
235 }
236
237 return this;
238 }
239
240 @Override
241 public FutureCompletionStage<V> await() throws InterruptedException {
242 if (isDone()) {
243 return this;
244 }
245
246 if (Thread.interrupted()) {
247 throw new InterruptedException(toString());
248 }
249
250 checkDeadLock();
251
252 synchronized (this) {
253 while (!isDone()) {
254 incWaiters();
255 try {
256 wait();
257 } finally {
258 decWaiters();
259 }
260 }
261 }
262 return this;
263 }
264
265 @Override
266 public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
267 return await0(unit.toNanos(timeout), true);
268 }
269
270 @SuppressWarnings("unchecked")
271 @Override
272 public V getNow() {
273 Object result = this.result;
274 if (!isDone0(result)) {
275 throw new IllegalStateException("Cannot call getNow() on a future that has not completed.");
276 }
277 if (result instanceof CauseHolder || result == SUCCESS) {
278 return null;
279 }
280 return (V) result;
281 }
282
283 @SuppressWarnings("unchecked")
284 @Override
285 public V get() throws InterruptedException, ExecutionException {
286 Object result = this.result;
287 if (!isDone0(result)) {
288 await();
289 result = this.result;
290 }
291 if (result == SUCCESS || result == UNCANCELLABLE) {
292 return null;
293 }
294 Throwable cause = cause0(result);
295 if (cause == null) {
296 return (V) result;
297 }
298 if (cause instanceof CancellationException) {
299 throw (CancellationException) cause;
300 }
301 throw new ExecutionException(cause);
302 }
303
304 @SuppressWarnings("unchecked")
305 @Override
306 public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
307 Object result = this.result;
308 if (!isDone0(result)) {
309 if (!await(timeout, unit)) {
310 throw new TimeoutException();
311 }
312 result = this.result;
313 }
314 if (result == SUCCESS || result == UNCANCELLABLE) {
315 return null;
316 }
317 Throwable cause = cause0(result);
318 if (cause == null) {
319 return (V) result;
320 }
321 if (cause instanceof CancellationException) {
322 throw (CancellationException) cause;
323 }
324 throw new ExecutionException(cause);
325 }
326
327 @Override
328 public boolean cancel() {
329 if (RESULT_UPDATER.compareAndSet(this, null, CANCELLATION_CAUSE_HOLDER)) {
330 if (checkNotifyWaiters()) {
331 notifyListeners();
332 }
333 return true;
334 }
335 return false;
336 }
337
338 @Override
339 public boolean isCancelled() {
340 return isCancelled0(result);
341 }
342
343 @Override
344 public boolean isDone() {
345 return isDone0(result);
346 }
347
348 @Override
349 public FutureCompletionStage<V> sync() throws InterruptedException {
350 await();
351 rethrowIfFailed();
352 return this;
353 }
354
355 @Override
356 public String toString() {
357 return toStringBuilder().toString();
358 }
359
360 protected StringBuilder toStringBuilder() {
361 StringBuilder buf = new StringBuilder(64)
362 .append(StringUtil.simpleClassName(this))
363 .append('@')
364 .append(Integer.toHexString(hashCode()));
365
366 Object result = this.result;
367 if (result == SUCCESS) {
368 buf.append("(success)");
369 } else if (result == UNCANCELLABLE) {
370 buf.append("(uncancellable)");
371 } else if (result instanceof CauseHolder) {
372 buf.append("(failure: ")
373 .append(((CauseHolder) result).cause)
374 .append(')');
375 } else if (result != null) {
376 buf.append("(success: ")
377 .append(result)
378 .append(')');
379 } else {
380 buf.append("(incomplete)");
381 }
382
383 return buf;
384 }
385
386
387
388
389
390
391
392
393
394 @Override
395 public final EventExecutor executor() {
396 return executor;
397 }
398
399 protected void checkDeadLock() {
400 checkDeadLock(executor);
401 }
402
403 protected final void checkDeadLock(EventExecutor executor) {
404 if (executor.inEventLoop()) {
405 throw new BlockingOperationException(toString());
406 }
407 }
408
409 private void notifyListeners() {
410 safeExecute(executor(), new NotifyListeners(this));
411 }
412
413 private static final class NotifyListeners implements Runnable {
414 private final DefaultPromise<?> promise;
415
416 private NotifyListeners(DefaultPromise<?> promise) {
417 this.promise = promise;
418 }
419
420 @Override
421 public void run() {
422 promise.notifyListenersNow();
423 }
424 }
425
426 @SuppressWarnings({ "unchecked", "MethodOnlyUsedFromInnerClass" })
427 private void notifyListenersNow() {
428 Object listeners;
429 synchronized (this) {
430
431 if (this.listeners == null) {
432 return;
433 }
434 listeners = this.listeners;
435 this.listeners = null;
436 }
437 for (;;) {
438 if (listeners instanceof DefaultFutureListeners) {
439 notifyListeners0((DefaultFutureListeners) listeners);
440 } else {
441 notifyListener0(this, (FutureListener<V>) listeners);
442 }
443 synchronized (this) {
444 if (this.listeners == null) {
445 return;
446 }
447 listeners = this.listeners;
448 this.listeners = null;
449 }
450 }
451 }
452
453 private void notifyListeners0(DefaultFutureListeners listeners) {
454 listeners.notifyListeners(this, logger);
455 }
456
457 static <V> void notifyListener0(Future<V> future, FutureListener<? super V> l) {
458 try {
459 l.operationComplete(future);
460 } catch (Throwable t) {
461 if (logger.isWarnEnabled()) {
462 logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t);
463 }
464 }
465 }
466
467 private synchronized void addListener0(Object listener, Object context) {
468 if (listeners == null && context == null) {
469 listeners = listener;
470 } else if (listeners instanceof DefaultFutureListeners) {
471 ((DefaultFutureListeners) listeners).add(listener, context);
472 } else {
473 DefaultFutureListeners listeners = new DefaultFutureListeners();
474 if (this.listeners != null) {
475 listeners.add(this.listeners, null);
476 }
477 listeners.add(listener, context);
478 this.listeners = listeners;
479 }
480 }
481
482 private boolean setSuccess0(V result) {
483 return setValue0(result == null ? SUCCESS : result);
484 }
485
486 private boolean setFailure0(Throwable cause) {
487 return setValue0(new CauseHolder(requireNonNull(cause, "cause")));
488 }
489
490 private boolean setValue0(Object objResult) {
491 if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
492 RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
493 if (checkNotifyWaiters()) {
494 notifyListeners();
495 }
496 return true;
497 }
498 return false;
499 }
500
501
502
503
504
505 private synchronized boolean checkNotifyWaiters() {
506 if (waiters > 0) {
507 notifyAll();
508 }
509 return listeners != null;
510 }
511
512 private void incWaiters() {
513 if (waiters == Short.MAX_VALUE) {
514 throw new IllegalStateException("too many waiters: " + this);
515 }
516 ++waiters;
517 }
518
519 private void decWaiters() {
520 --waiters;
521 }
522
523 private void rethrowIfFailed() {
524 Throwable cause = cause();
525 if (cause == null) {
526 return;
527 }
528 if (cause instanceof CancellationException) {
529 throw (CancellationException) cause;
530 }
531 throw new CompletionException(cause);
532 }
533
534 private boolean await0(long timeoutNanos, boolean interruptable) throws InterruptedException {
535 if (isDone()) {
536 return true;
537 }
538
539 if (timeoutNanos <= 0) {
540 return isDone();
541 }
542
543 if (interruptable && Thread.interrupted()) {
544 throw new InterruptedException(toString());
545 }
546
547 checkDeadLock();
548
549
550
551 final long startTime = System.nanoTime();
552 synchronized (this) {
553 boolean interrupted = false;
554 try {
555 long waitTime = timeoutNanos;
556 while (!isDone() && waitTime > 0) {
557 incWaiters();
558 try {
559 wait(waitTime / 1000000, (int) (waitTime % 1000000));
560 } catch (InterruptedException e) {
561 if (interruptable) {
562 throw e;
563 } else {
564 interrupted = true;
565 }
566 } finally {
567 decWaiters();
568 }
569
570 if (isDone()) {
571 return true;
572 }
573
574
575 waitTime = timeoutNanos - (System.nanoTime() - startTime);
576 }
577 return isDone();
578 } finally {
579 if (interrupted) {
580 Thread.currentThread().interrupt();
581 }
582 }
583 }
584 }
585
586 private static boolean isCancelled0(Object result) {
587 return result instanceof CauseHolder && ((CauseHolder) result).cause instanceof CancellationException;
588 }
589
590 private static boolean isDone0(Object result) {
591 return result != null && result != UNCANCELLABLE;
592 }
593
594 private static final class CauseHolder {
595 final Throwable cause;
596 CauseHolder(Throwable cause) {
597 this.cause = cause;
598 }
599 }
600
601 static void safeExecute(EventExecutor executor, Runnable task) {
602 try {
603 executor.execute(task);
604 } catch (Throwable t) {
605 rejectedExecutionLogger.error("Failed to submit a listener notification task. Event loop shut down?", t);
606 }
607 }
608
609 @Override
610 public FutureCompletionStage<V> asStage() {
611 return this;
612 }
613
614
615 private enum Marker {
616 EMPTY,
617 ERROR
618 }
619
620
621 private static final Executor SAME_AS_FUTURE = task -> {
622 throw new UnsupportedOperationException("Marker executor. Should never be called!");
623 };
624
625 @Override
626 public boolean cancel(boolean mayInterruptIfRunning) {
627 return cancel();
628 }
629
630 @Override
631 public Future<V> future() {
632 return this;
633 }
634
635 @Override
636 public <U> FutureCompletionStage<U> thenApply(Function<? super V, ? extends U> fn) {
637 return thenApplyAsync(fn, SAME_AS_FUTURE);
638 }
639
640 @Override
641 public <U> FutureCompletionStage<U> thenApplyAsync(Function<? super V, ? extends U> fn) {
642 return thenApplyAsync(fn, ForkJoinPool.commonPool());
643 }
644
645 @Override
646 public FutureCompletionStage<Void> thenAccept(Consumer<? super V> action) {
647 return thenAcceptAsync(action, SAME_AS_FUTURE);
648 }
649
650 @Override
651 public FutureCompletionStage<Void> thenAcceptAsync(Consumer<? super V> action) {
652 return thenAcceptAsync(action, ForkJoinPool.commonPool());
653 }
654
655 @Override
656 public FutureCompletionStage<Void> thenRun(Runnable action) {
657 return thenRunAsync(action, SAME_AS_FUTURE);
658 }
659
660 @Override
661 public FutureCompletionStage<Void> thenRunAsync(Runnable action) {
662 return thenRunAsync(action, ForkJoinPool.commonPool());
663 }
664
665 @Override
666 public <U, V1> FutureCompletionStage<V1> thenCombine(
667 CompletionStage<? extends U> other, BiFunction<? super V, ? super U, ? extends V1> fn) {
668 return thenCombineAsync(other, fn, SAME_AS_FUTURE);
669 }
670
671 @Override
672 public <U, V1> FutureCompletionStage<V1> thenCombineAsync(
673 CompletionStage<? extends U> other, BiFunction<? super V, ? super U, ? extends V1> fn) {
674 return thenCombineAsync(other, fn, ForkJoinPool.commonPool());
675 }
676
677 @Override
678 public <U> FutureCompletionStage<Void> thenAcceptBoth(
679 CompletionStage<? extends U> other, BiConsumer<? super V, ? super U> action) {
680 return thenAcceptBothAsync(other, action, SAME_AS_FUTURE);
681 }
682
683 @Override
684 public <U> FutureCompletionStage<Void> thenAcceptBothAsync(
685 CompletionStage<? extends U> other, BiConsumer<? super V, ? super U> action) {
686 return thenAcceptBothAsync(other, action, ForkJoinPool.commonPool());
687 }
688
689 @Override
690 public FutureCompletionStage<Void> runAfterBoth(CompletionStage<?> other, Runnable action) {
691 return runAfterBothAsync(other, action, SAME_AS_FUTURE);
692 }
693
694 @Override
695 public FutureCompletionStage<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action) {
696 return runAfterBothAsync(other, action, ForkJoinPool.commonPool());
697 }
698
699 @Override
700 public <U> FutureCompletionStage<U> applyToEither(
701 CompletionStage<? extends V> other, Function<? super V, U> fn) {
702 return applyToEitherAsync(other, fn, SAME_AS_FUTURE);
703 }
704
705 @Override
706 public <U> FutureCompletionStage<U> applyToEitherAsync(
707 CompletionStage<? extends V> other, Function<? super V, U> fn) {
708 return applyToEitherAsync(other, fn, ForkJoinPool.commonPool());
709 }
710
711 @Override
712 public FutureCompletionStage<Void> acceptEither(CompletionStage<? extends V> other, Consumer<? super V> action) {
713 return acceptEitherAsync(other, action, SAME_AS_FUTURE);
714 }
715
716 @Override
717 public FutureCompletionStage<Void> acceptEitherAsync(
718 CompletionStage<? extends V> other, Consumer<? super V> action) {
719 return acceptEitherAsync(other, action, ForkJoinPool.commonPool());
720 }
721
722 @Override
723 public FutureCompletionStage<Void> runAfterEither(CompletionStage<?> other, Runnable action) {
724 return runAfterEitherAsync(other, action, SAME_AS_FUTURE);
725 }
726
727 @Override
728 public FutureCompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action) {
729 return runAfterEitherAsync(other, action, ForkJoinPool.commonPool());
730 }
731
732 @Override
733 public <U> FutureCompletionStage<U> thenCompose(Function<? super V, ? extends CompletionStage<U>> fn) {
734 return thenComposeAsync(fn, SAME_AS_FUTURE);
735 }
736
737 @Override
738 public <U> FutureCompletionStage<U> thenComposeAsync(Function<? super V, ? extends CompletionStage<U>> fn) {
739 return thenComposeAsync(fn, ForkJoinPool.commonPool());
740 }
741
742 @Override
743 public FutureCompletionStage<V> whenComplete(BiConsumer<? super V, ? super Throwable> action) {
744 return whenCompleteAsync(action, SAME_AS_FUTURE);
745 }
746
747 @Override
748 public FutureCompletionStage<V> whenCompleteAsync(BiConsumer<? super V, ? super Throwable> action) {
749 return whenCompleteAsync(action, ForkJoinPool.commonPool());
750 }
751
752 @Override
753 public <U> FutureCompletionStage<U> handle(BiFunction<? super V, Throwable, ? extends U> fn) {
754 return handleAsync(fn, SAME_AS_FUTURE);
755 }
756
757 @Override
758 public <U> FutureCompletionStage<U> handleAsync(BiFunction<? super V, Throwable, ? extends U> fn) {
759 return handleAsync(fn, ForkJoinPool.commonPool());
760 }
761
762 @Override
763 public <U> FutureCompletionStage<U> thenApplyAsync(Function<? super V, ? extends U> fn, Executor executor) {
764 requireNonNull(fn, "fn");
765 requireNonNull(executor, "executor");
766
767 Promise<U> promise = executor().newPromise();
768 addListener(future -> {
769 Throwable cause = future.cause();
770 if (cause == null) {
771 V value = future.getNow();
772 if (executeDirectly(executor)) {
773 thenApplyAsync0(promise, value, fn);
774 } else {
775 safeExecute(executor, () -> thenApplyAsync0(promise, value, fn), promise);
776 }
777 } else {
778 promise.setFailure(cause);
779 }
780 });
781 return promise.asFuture().asStage();
782 }
783
784 private static <U, V> void thenApplyAsync0(Promise<U> promise, V value, Function<? super V, ? extends U> fn) {
785 final U result;
786 try {
787 result = fn.apply(value);
788 } catch (Throwable cause) {
789 promise.setFailure(cause);
790 return;
791 }
792 promise.setSuccess(result);
793 }
794
795 @Override
796 public FutureCompletionStage<Void> thenAcceptAsync(Consumer<? super V> action, Executor executor) {
797 requireNonNull(action, "action");
798 requireNonNull(executor, "executor");
799
800 Promise<Void> promise = executor().newPromise();
801 addListener(future -> {
802 Throwable cause = future.cause();
803 if (cause == null) {
804 V value = future.getNow();
805 if (executeDirectly(executor)) {
806 thenAcceptAsync0(promise, value, action);
807 } else {
808 safeExecute(executor, () -> thenAcceptAsync0(promise, value, action), promise);
809 }
810 } else {
811 promise.setFailure(cause);
812 }
813 });
814 return promise.asFuture().asStage();
815 }
816
817 private static <U, V> void thenAcceptAsync0(Promise<U> promise, V value, Consumer<? super V> action) {
818 try {
819 action.accept(value);
820 promise.setSuccess(null);
821 } catch (Throwable cause) {
822 promise.setFailure(cause);
823 }
824 }
825
826 @Override
827 public FutureCompletionStage<Void> thenRunAsync(Runnable action, Executor executor) {
828 return thenAcceptAsync(ignore -> action.run(), executor);
829 }
830
831 @Override
832 public <U, V1> FutureCompletionStage<V1> thenCombineAsync(
833 CompletionStage<? extends U> other, BiFunction<? super V, ? super U, ? extends V1> fn, Executor executor) {
834 requireNonNull(other, "other");
835 requireNonNull(fn, "fn");
836 requireNonNull(executor, "executor");
837
838 Promise<V1> promise = executor().newPromise();
839 AtomicReference<Object> reference = new AtomicReference<>(Marker.EMPTY);
840
841 abstract class CombineBiConsumer<T1, T2, T> implements BiConsumer<T, Throwable> {
842 @SuppressWarnings("unchecked")
843 @Override
844 public void accept(T v, Throwable error) {
845 if (error == null) {
846 if (!reference.compareAndSet(Marker.EMPTY, v)) {
847 Object rawValue = reference.get();
848 if (rawValue == Marker.ERROR) {
849 return;
850 }
851 applyAndNotify0(promise, (T1) v, (T2) rawValue, fn);
852 }
853 } else {
854 if (reference.getAndSet(Marker.ERROR) != Marker.ERROR) {
855
856 promise.setFailure(error);
857 }
858 }
859 }
860
861 abstract void applyAndNotify0(
862 Promise<V1> promise, T1 value1, T2 value2, BiFunction<? super V, ? super U, ? extends V1> fn);
863 }
864
865 whenCompleteAsync(new CombineBiConsumer<V, U, V>() {
866 @Override
867 void applyAndNotify0(
868 Promise<V1> promise, V value1, U value2, BiFunction<? super V, ? super U, ? extends V1> fn) {
869 applyAndNotify(promise, value1, value2, fn);
870 }
871 }, executor);
872 other.whenCompleteAsync(new CombineBiConsumer<U, V, U>() {
873 @Override
874 void applyAndNotify0(
875 Promise<V1> promise, U value1, V value2, BiFunction<? super V, ? super U, ? extends V1> fn) {
876 applyAndNotify(promise, value2, value1, fn);
877 }
878 }, otherExecutor(executor));
879 return promise.asFuture().asStage();
880 }
881
882 private Executor otherExecutor(Executor executor) {
883 return executor == SAME_AS_FUTURE ? executor() : executor;
884 }
885
886 @Override
887 public <U> FutureCompletionStage<Void> thenAcceptBothAsync(
888 CompletionStage<? extends U> other, BiConsumer<? super V, ? super U> action, Executor executor) {
889 requireNonNull(action, "action");
890 return thenCombineAsync(other, (value, value2) -> {
891 action.accept(value, value2);
892 return null;
893 }, executor);
894 }
895
896 @Override
897 public FutureCompletionStage<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action, Executor executor) {
898 requireNonNull(action, "action");
899 return thenCombineAsync(other, (ignoreOtherValue, ignoreError) -> {
900 action.run();
901 return null;
902 }, executor);
903 }
904
905 @Override
906 public <U> FutureCompletionStage<U> applyToEitherAsync(
907 CompletionStage<? extends V> other, Function<? super V, U> fn, Executor executor) {
908 requireNonNull(other, "other");
909 requireNonNull(fn, "fn");
910
911 Promise<U> promise = executor().newPromise();
912 BiConsumer<V, Throwable> consumer = new AtomicBiConsumer<>(promise) {
913 private static final long serialVersionUID = -8454630185124276599L;
914
915 @Override
916 protected U apply(V value) {
917 return fn.apply(value);
918 }
919 };
920 whenCompleteAsync(consumer, executor);
921 other.whenCompleteAsync(consumer, otherExecutor(executor));
922 return promise.asFuture().asStage();
923 }
924
925 @Override
926 public FutureCompletionStage<Void> acceptEitherAsync(
927 CompletionStage<? extends V> other, Consumer<? super V> action, Executor executor) {
928 requireNonNull(other, "other");
929 requireNonNull(action, "action");
930
931 Promise<Void> promise = executor().newPromise();
932 BiConsumer<V, Throwable> consumer = new AtomicBiConsumer<>(promise) {
933 private static final long serialVersionUID = -8429618092318150682L;
934
935 @Override
936 protected Void apply(V value) {
937 action.accept(value);
938 return null;
939 }
940 };
941 whenCompleteAsync(consumer, executor);
942 other.whenCompleteAsync(consumer, otherExecutor(executor));
943 return promise.asFuture().asStage();
944 }
945
946 @Override
947 public FutureCompletionStage<Void> runAfterEitherAsync(
948 CompletionStage<?> other, Runnable action, Executor executor) {
949 requireNonNull(other, "other");
950 requireNonNull(action, "action");
951
952 Promise<Void> promise = executor().newPromise();
953 BiConsumer<Object, Throwable> consumer = new AtomicBiConsumer<>(promise) {
954 private static final long serialVersionUID = 5994110691767731494L;
955
956 @Override
957 protected Void apply(Object value) {
958 action.run();
959 return null;
960 }
961 };
962 whenCompleteAsync(consumer, executor);
963 other.whenCompleteAsync(consumer, otherExecutor(executor));
964 return promise.asFuture().asStage();
965 }
966
967 @Override
968 public <U> FutureCompletionStage<U> thenComposeAsync(
969 Function<? super V, ? extends CompletionStage<U>> fn, Executor executor) {
970 requireNonNull(fn, "fn");
971 requireNonNull(executor, "executor");
972
973 Promise<U> promise = executor().newPromise();
974 addListener(f -> {
975 Throwable cause = f.cause();
976 if (cause == null) {
977 V value = f.getNow();
978 if (executeDirectly(executor)) {
979 thenComposeAsync0(promise, fn, value);
980 } else {
981 safeExecute(executor, () -> thenComposeAsync0(promise, fn, value), promise);
982 }
983 } else {
984 promise.setFailure(cause);
985 }
986 });
987 return promise.asFuture().asStage();
988 }
989
990 private static <V, U> void thenComposeAsync0(
991 Promise<U> promise, Function<? super V, ? extends CompletionStage<U>> fn, V value) {
992 final CompletionStage<U> result;
993 try {
994 result = fn.apply(value);
995 } catch (Throwable cause) {
996 promise.setFailure(cause);
997 return;
998 }
999 result.whenComplete((v, error) -> {
1000 if (error == null) {
1001 promise.setSuccess(v);
1002 } else {
1003 promise.setFailure(error);
1004 }
1005 });
1006 }
1007
1008 @Override
1009 public FutureCompletionStage<V> exceptionally(Function<Throwable, ? extends V> fn) {
1010 requireNonNull(fn, "fn");
1011
1012 Promise<V> promise = executor().newPromise();
1013 addListener(f -> {
1014 Throwable error = f.cause();
1015 if (error == null) {
1016 V value = f.getNow();
1017 promise.setSuccess(value);
1018 } else {
1019 final V result;
1020 try {
1021 result = fn.apply(error);
1022 } catch (Throwable cause) {
1023 promise.setFailure(cause);
1024 return;
1025 }
1026 promise.setSuccess(result);
1027 }
1028 });
1029 return promise.asFuture().asStage();
1030 }
1031
1032 @Override
1033 public FutureCompletionStage<V> whenCompleteAsync(
1034 BiConsumer<? super V, ? super Throwable> action, Executor executor) {
1035 requireNonNull(action, "action");
1036 requireNonNull(executor, "executor");
1037
1038 Promise<V> promise = executor().newPromise();
1039 addListener(f -> {
1040 if (executeDirectly(executor)) {
1041 whenCompleteAsync0(promise, f, action);
1042 } else {
1043 safeExecute(executor, () -> whenCompleteAsync0(promise, f, action), promise);
1044 }
1045 });
1046 return promise.asFuture().asStage();
1047 }
1048
1049 private static <V> void whenCompleteAsync0(
1050 Promise<V> promise, Future<? extends V> f, BiConsumer<? super V, ? super Throwable> action) {
1051 Throwable cause = f.cause();
1052 V value = cause == null ? f.getNow() : null;
1053 try {
1054 action.accept(value, cause);
1055 } catch (Throwable error) {
1056 promise.setFailure(error);
1057 return;
1058 }
1059
1060 if (cause == null) {
1061 promise.setSuccess(value);
1062 } else {
1063 promise.setFailure(cause);
1064 }
1065 }
1066
1067 @Override
1068 public <U> FutureCompletionStage<U> handleAsync(
1069 BiFunction<? super V, Throwable, ? extends U> fn, Executor executor) {
1070 requireNonNull(fn, "fn");
1071 requireNonNull(executor, "executor");
1072
1073 Promise<U> promise = executor().newPromise();
1074 addListener(f -> {
1075 if (executeDirectly(executor)) {
1076 handleAsync0(promise, f, fn);
1077 } else {
1078 safeExecute(executor, () -> handleAsync0(promise, f, fn), promise);
1079 }
1080 });
1081 return promise.asFuture().asStage();
1082 }
1083
1084 @SuppressWarnings("unchecked")
1085 private static <U, V> void handleAsync0(
1086 Promise<U> promise, Future<? super V> f, BiFunction<? super V, Throwable, ? extends U> fn) {
1087 Throwable cause = f.cause();
1088 applyAndNotify(promise, cause == null ? (V) f.getNow() : null, cause, fn);
1089 }
1090
1091 private static <U, V, T> void applyAndNotify(
1092 Promise<U> promise, V value, T value2, BiFunction<? super V, ? super T, ? extends U> fn) {
1093 final U result;
1094 try {
1095 result = fn.apply(value, value2);
1096 } catch (Throwable error) {
1097 promise.setFailure(error);
1098 return;
1099 }
1100 promise.setSuccess(result);
1101 }
1102
1103 private static boolean executeDirectly(Executor executor) {
1104 return executor == SAME_AS_FUTURE;
1105 }
1106
1107 private static void safeExecute(Executor executor, Runnable task, Promise<?> promise) {
1108 try {
1109 executor.execute(task);
1110 } catch (Throwable cause) {
1111 promise.setFailure(cause);
1112 }
1113 }
1114
1115 private abstract static class AtomicBiConsumer<V, U> extends AtomicReference<Object>
1116 implements BiConsumer<V, Throwable> {
1117 private static final long serialVersionUID = 880039612531973027L;
1118
1119 private final Promise<U> promise;
1120
1121 AtomicBiConsumer(Promise<U> promise) {
1122 super(Marker.EMPTY);
1123 this.promise = promise;
1124 }
1125
1126 @Override
1127 public void accept(V v, Throwable error) {
1128 if (error == null) {
1129 if (compareAndSet(Marker.EMPTY, v)) {
1130 final U value;
1131 try {
1132 value = apply(v);
1133 } catch (Throwable cause) {
1134 promise.setFailure(cause);
1135 return;
1136 }
1137 promise.setSuccess(value);
1138 }
1139 } else if (compareAndSet(Marker.EMPTY, Marker.ERROR)) {
1140 promise.setFailure(error);
1141 }
1142 }
1143
1144 protected abstract U apply(V value);
1145 }
1146
1147
1148 private static final class StacklessCancellationException extends CancellationException {
1149
1150 private static final long serialVersionUID = -2974906711413716191L;
1151
1152
1153
1154 @Override
1155 public Throwable fillInStackTrace() {
1156 return this;
1157 }
1158
1159 static StacklessCancellationException newInstance(Class<?> clazz, String method) {
1160 return ThrowableUtil.unknownStackTrace(new StacklessCancellationException(), clazz, method);
1161 }
1162 }
1163 }