View Javadoc
1   /*
2    * Copyright 2013 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty5.util.concurrent;
17  
18  import io.netty5.util.internal.StringUtil;
19  import io.netty5.util.internal.ThrowableUtil;
20  import io.netty5.util.internal.logging.InternalLogger;
21  import io.netty5.util.internal.logging.InternalLoggerFactory;
22  
23  import java.util.concurrent.CancellationException;
24  import java.util.concurrent.CompletionException;
25  import java.util.concurrent.CompletionStage;
26  import java.util.concurrent.ExecutionException;
27  import java.util.concurrent.Executor;
28  import java.util.concurrent.ForkJoinPool;
29  import java.util.concurrent.TimeUnit;
30  import java.util.concurrent.TimeoutException;
31  import java.util.concurrent.atomic.AtomicReference;
32  import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
33  import java.util.function.BiConsumer;
34  import java.util.function.BiFunction;
35  import java.util.function.Consumer;
36  import java.util.function.Function;
37  
38  import static java.util.Objects.requireNonNull;
39  
40  public class DefaultPromise<V> implements Promise<V>, Future<V>,
41                                            FutureCompletionStage<V>, java.util.concurrent.Future<V> {
42      private static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultPromise.class);
43      private static final InternalLogger rejectedExecutionLogger =
44              InternalLoggerFactory.getInstance(DefaultPromise.class.getName() + ".rejectedExecution");
45      @SuppressWarnings("rawtypes")
46      private static final AtomicReferenceFieldUpdater<DefaultPromise, Object> RESULT_UPDATER =
47              AtomicReferenceFieldUpdater.newUpdater(DefaultPromise.class, Object.class, "result");
48      private static final Object SUCCESS = new Object();
49      private static final Object UNCANCELLABLE = new Object();
50      private static final CauseHolder CANCELLATION_CAUSE_HOLDER = new CauseHolder(
51              StacklessCancellationException.newInstance(DefaultPromise.class, "cancel(...)"));
52      private static final StackTraceElement[] CANCELLATION_STACK = CANCELLATION_CAUSE_HOLDER.cause.getStackTrace();
53      static final Object NULL_CONTEXT = new Object();
54  
55      private volatile Object result;
56      private final EventExecutor executor;
57  
58      /**
59       * One or more listeners. Can be a {@link FutureListener} or a {@link DefaultFutureListeners}.
60       * If {@code null}, it means either 1) no listeners were added yet or 2) all listeners were notified.
61       * <p>
62       * Note that if a {@link FutureContextListener} is added, we immediately upgrade to a {@link DefaultFutureListeners}
63       * as we otherwise wouldn't have room to store the associated context object.
64       * <p>
65       * Threading - synchronized(this). We must support adding listeners when there is no EventExecutor.
66       */
67      private Object listeners;
68      /**
69       * Threading - synchronized(this). We are required to hold the monitor to use Java's underlying wait()/notifyAll().
70       */
71      private short waiters;
72  
73      /**
74       * Creates a new unfulfilled promise.
75       *
76       * This constructor is only meant to be used by sub-classes.
77       *
78       * @param executor
79       *        The {@link EventExecutor} which is used to notify the promise once it is complete.
80       *        It is assumed this executor will protect against {@link StackOverflowError} exceptions.
81       *        The executor may be used to avoid {@link StackOverflowError} by executing a {@link Runnable} if the stack
82       *        depth exceeds a threshold.
83       */
84      protected DefaultPromise(EventExecutor executor) {
85          this.executor = requireNonNull(executor, "executor");
86      }
87  
88      /**
89       * Creates a new promise that has already been completed successfully.
90       *
91       * @param executor
92       *        The {@link EventExecutor} which is used to notify the promise once it is complete.
93       *        It is assumed this executor will protect against {@link StackOverflowError} exceptions.
94       *        The executor may be used to avoid {@link StackOverflowError} by executing a {@link Runnable} if the stack
95       *        depth exceeds a threshold.
96       * @param result The result of the successful promise.
97       */
98      static <V> Promise<V> newSuccessfulPromise(EventExecutor executor, V result) {
99          return new DefaultPromise<>(executor, result);
100     }
101 
102     /**
103      * Creates a new promise that has already failed.
104      *
105      * @param executor
106      *        The {@link EventExecutor} which is used to notify the promise once it is complete.
107      *        It is assumed this executor will protect against {@link StackOverflowError} exceptions.
108      *        The executor may be used to avoid {@link StackOverflowError} by executing a {@link Runnable} if the stack
109      *        depth exceeds a threshold.
110      * @param cause The {@link Throwable} that caused the failure of the returned promise.
111      */
112     static <V> Promise<V> newFailedPromise(EventExecutor executor, Throwable cause) {
113         return new DefaultPromise<>(cause, executor);
114     }
115 
116     private DefaultPromise(EventExecutor executor, Object result) {
117         this.executor = requireNonNull(executor, "executor");
118         this.result = result == null? SUCCESS : result;
119     }
120 
121     private DefaultPromise(Throwable cause, EventExecutor executor) {
122         this.executor = requireNonNull(executor, "executor");
123         result = new CauseHolder(requireNonNull(cause, "cause"));
124     }
125 
126     @Override
127     public Promise<V> setSuccess(V result) {
128         if (setSuccess0(result)) {
129             return this;
130         }
131         throw new IllegalStateException("complete already: " + this);
132     }
133 
134     @Override
135     public boolean trySuccess(V result) {
136         return setSuccess0(result);
137     }
138 
139     @Override
140     public Promise<V> setFailure(Throwable cause) {
141         if (setFailure0(cause)) {
142             return this;
143         }
144         throw new IllegalStateException("complete already: " + this, cause);
145     }
146 
147     @Override
148     public boolean tryFailure(Throwable cause) {
149         return setFailure0(cause);
150     }
151 
152     @Override
153     public boolean setUncancellable() {
154         return RESULT_UPDATER.compareAndSet(this, null, UNCANCELLABLE);
155     }
156 
157     @Override
158     public Future<V> asFuture() {
159         return this;
160     }
161 
162     @Override
163     public boolean isSuccess() {
164         Object result = this.result;
165         return result != null && result != UNCANCELLABLE && !(result instanceof CauseHolder);
166     }
167 
168     @Override
169     public boolean isFailed() {
170         return result instanceof CauseHolder;
171     }
172 
173     @Override
174     public boolean isCancellable() {
175         return result == null;
176     }
177 
178     private static final class LeanCancellationException extends CancellationException {
179         private static final long serialVersionUID = 2794674970981187807L;
180 
181         // Suppress a warning since the method doesn't need synchronization
182         @Override
183         public Throwable fillInStackTrace() {   // lgtm[java/non-sync-override]
184             setStackTrace(CANCELLATION_STACK);
185             return this;
186         }
187 
188         @Override
189         public String toString() {
190             return CancellationException.class.getName();
191         }
192     }
193 
194     @Override
195     public Throwable cause() {
196         return cause0(result);
197     }
198 
199     private Throwable cause0(Object result) {
200         if (!isDone0(result)) {
201             throw new IllegalStateException("Cannot call cause() on a future that has not completed.");
202         }
203         if (!(result instanceof CauseHolder)) {
204             return null;
205         }
206         if (result == CANCELLATION_CAUSE_HOLDER) {
207             CancellationException ce = new LeanCancellationException();
208             if (RESULT_UPDATER.compareAndSet(this, CANCELLATION_CAUSE_HOLDER, new CauseHolder(ce))) {
209                 return ce;
210             }
211             result = this.result;
212         }
213         return ((CauseHolder) result).cause;
214     }
215 
216     @Override
217     public Future<V> addListener(FutureListener<? super V> listener) {
218         requireNonNull(listener, "listener");
219 
220         addListener0(listener, null);
221         if (isDone()) {
222             notifyListeners();
223         }
224 
225         return this;
226     }
227 
228     @Override
229     public <C> Future<V> addListener(C context, FutureContextListener<? super C, ? super V> listener) {
230         requireNonNull(listener, "listener");
231 
232         addListener0(listener, context == null ? NULL_CONTEXT : context);
233         if (isDone()) {
234             notifyListeners();
235         }
236 
237         return this;
238     }
239 
240     @Override
241     public FutureCompletionStage<V> await() throws InterruptedException {
242         if (isDone()) {
243             return this;
244         }
245 
246         if (Thread.interrupted()) {
247             throw new InterruptedException(toString());
248         }
249 
250         checkDeadLock();
251 
252         synchronized (this) {
253             while (!isDone()) {
254                 incWaiters();
255                 try {
256                     wait();
257                 } finally {
258                     decWaiters();
259                 }
260             }
261         }
262         return this;
263     }
264 
265     @Override
266     public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
267         return await0(unit.toNanos(timeout), true);
268     }
269 
270     @SuppressWarnings("unchecked")
271     @Override
272     public V getNow() {
273         Object result = this.result;
274         if (!isDone0(result)) {
275             throw new IllegalStateException("Cannot call getNow() on a future that has not completed.");
276         }
277         if (result instanceof CauseHolder || result == SUCCESS) {
278             return null;
279         }
280         return (V) result;
281     }
282 
283     @SuppressWarnings("unchecked")
284     @Override
285     public V get() throws InterruptedException, ExecutionException {
286         Object result = this.result;
287         if (!isDone0(result)) {
288             await();
289             result = this.result;
290         }
291         if (result == SUCCESS || result == UNCANCELLABLE) {
292             return null;
293         }
294         Throwable cause = cause0(result);
295         if (cause == null) {
296             return (V) result;
297         }
298         if (cause instanceof CancellationException) {
299             throw (CancellationException) cause;
300         }
301         throw new ExecutionException(cause);
302     }
303 
304     @SuppressWarnings("unchecked")
305     @Override
306     public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
307         Object result = this.result;
308         if (!isDone0(result)) {
309             if (!await(timeout, unit)) {
310                 throw new TimeoutException();
311             }
312             result = this.result;
313         }
314         if (result == SUCCESS || result == UNCANCELLABLE) {
315             return null;
316         }
317         Throwable cause = cause0(result);
318         if (cause == null) {
319             return (V) result;
320         }
321         if (cause instanceof CancellationException) {
322             throw (CancellationException) cause;
323         }
324         throw new ExecutionException(cause);
325     }
326 
327     @Override
328     public boolean cancel() {
329         if (RESULT_UPDATER.compareAndSet(this, null, CANCELLATION_CAUSE_HOLDER)) {
330             if (checkNotifyWaiters()) {
331                 notifyListeners();
332             }
333             return true;
334         }
335         return false;
336     }
337 
338     @Override
339     public boolean isCancelled() {
340         return isCancelled0(result);
341     }
342 
343     @Override
344     public boolean isDone() {
345         return isDone0(result);
346     }
347 
348     @Override
349     public FutureCompletionStage<V> sync() throws InterruptedException {
350         await();
351         rethrowIfFailed();
352         return this;
353     }
354 
355     @Override
356     public String toString() {
357         return toStringBuilder().toString();
358     }
359 
360     protected StringBuilder toStringBuilder() {
361         StringBuilder buf = new StringBuilder(64)
362                 .append(StringUtil.simpleClassName(this))
363                 .append('@')
364                 .append(Integer.toHexString(hashCode()));
365 
366         Object result = this.result;
367         if (result == SUCCESS) {
368             buf.append("(success)");
369         } else if (result == UNCANCELLABLE) {
370             buf.append("(uncancellable)");
371         } else if (result instanceof CauseHolder) {
372             buf.append("(failure: ")
373                     .append(((CauseHolder) result).cause)
374                     .append(')');
375         } else if (result != null) {
376             buf.append("(success: ")
377                     .append(result)
378                     .append(')');
379         } else {
380             buf.append("(incomplete)");
381         }
382 
383         return buf;
384     }
385 
386     /**
387      * Get the executor used to notify listeners when this promise is complete.
388      * <p>
389      * It is assumed this executor will protect against {@link StackOverflowError} exceptions.
390      * The executor may be used to avoid {@link StackOverflowError} by executing a {@link Runnable} if the stack
391      * depth exceeds a threshold.
392      * @return The executor used to notify listeners when this promise is complete.
393      */
394     @Override
395     public final EventExecutor executor() {
396         return executor;
397     }
398 
399     protected void checkDeadLock() {
400         checkDeadLock(executor);
401     }
402 
403     protected final void checkDeadLock(EventExecutor executor) {
404         if (executor.inEventLoop()) {
405             throw new BlockingOperationException(toString());
406         }
407     }
408 
409     private void notifyListeners() {
410         safeExecute(executor(), new NotifyListeners(this));
411     }
412 
413     private static final class NotifyListeners implements Runnable {
414         private final DefaultPromise<?> promise;
415 
416         private NotifyListeners(DefaultPromise<?> promise) {
417             this.promise = promise;
418         }
419 
420         @Override
421         public void run() {
422             promise.notifyListenersNow();
423         }
424     }
425 
426     @SuppressWarnings({ "unchecked", "MethodOnlyUsedFromInnerClass" })
427     private void notifyListenersNow() {
428         Object listeners;
429         synchronized (this) {
430             // Only proceed if there are listeners to notify.
431             if (this.listeners == null) {
432                 return;
433             }
434             listeners = this.listeners;
435             this.listeners = null;
436         }
437         for (;;) {
438             if (listeners instanceof DefaultFutureListeners) {
439                 notifyListeners0((DefaultFutureListeners) listeners);
440             } else {
441                 notifyListener0(this, (FutureListener<V>) listeners);
442             }
443             synchronized (this) {
444                 if (this.listeners == null) {
445                     return;
446                 }
447                 listeners = this.listeners;
448                 this.listeners = null;
449             }
450         }
451     }
452 
453     private void notifyListeners0(DefaultFutureListeners listeners) {
454         listeners.notifyListeners(this, logger);
455     }
456 
457     static <V> void notifyListener0(Future<V> future, FutureListener<? super V> l) {
458         try {
459             l.operationComplete(future);
460         } catch (Throwable t) {
461             if (logger.isWarnEnabled()) {
462                 logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t);
463             }
464         }
465     }
466 
467     private synchronized void addListener0(Object listener, Object context) {
468         if (listeners == null && context == null) {
469             listeners = listener;
470         } else if (listeners instanceof DefaultFutureListeners) {
471             ((DefaultFutureListeners) listeners).add(listener, context);
472         } else {
473             DefaultFutureListeners listeners = new DefaultFutureListeners();
474             if (this.listeners != null) {
475                 listeners.add(this.listeners, null);
476             }
477             listeners.add(listener, context);
478             this.listeners = listeners;
479         }
480     }
481 
482     private boolean setSuccess0(V result) {
483         return setValue0(result == null ? SUCCESS : result);
484     }
485 
486     private boolean setFailure0(Throwable cause) {
487         return setValue0(new CauseHolder(requireNonNull(cause, "cause")));
488     }
489 
490     private boolean setValue0(Object objResult) {
491         if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
492             RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
493             if (checkNotifyWaiters()) {
494                 notifyListeners();
495             }
496             return true;
497         }
498         return false;
499     }
500 
501     /**
502      * Check if there are any waiters and if so notify these.
503      * @return {@code true} if there are any listeners attached to the promise, {@code false} otherwise.
504      */
505     private synchronized boolean checkNotifyWaiters() {
506         if (waiters > 0) {
507             notifyAll();
508         }
509         return listeners != null;
510     }
511 
512     private void incWaiters() {
513         if (waiters == Short.MAX_VALUE) {
514             throw new IllegalStateException("too many waiters: " + this);
515         }
516         ++waiters;
517     }
518 
519     private void decWaiters() {
520         --waiters;
521     }
522 
523     private void rethrowIfFailed() {
524         Throwable cause = cause();
525         if (cause == null) {
526             return;
527         }
528         if (cause instanceof CancellationException) {
529             throw (CancellationException) cause;
530         }
531         throw new CompletionException(cause);
532     }
533 
534     private boolean await0(long timeoutNanos, boolean interruptable) throws InterruptedException {
535         if (isDone()) {
536             return true;
537         }
538 
539         if (timeoutNanos <= 0) {
540             return isDone();
541         }
542 
543         if (interruptable && Thread.interrupted()) {
544             throw new InterruptedException(toString());
545         }
546 
547         checkDeadLock();
548 
549         // Start counting time from here instead of the first line of this method,
550         // to avoid/postpone performance cost of System.nanoTime().
551         final long startTime = System.nanoTime();
552         synchronized (this) {
553             boolean interrupted = false;
554             try {
555                 long waitTime = timeoutNanos;
556                 while (!isDone() && waitTime > 0) {
557                     incWaiters();
558                     try {
559                         wait(waitTime / 1000000, (int) (waitTime % 1000000));
560                     } catch (InterruptedException e) {
561                         if (interruptable) {
562                             throw e;
563                         } else {
564                             interrupted = true;
565                         }
566                     } finally {
567                         decWaiters();
568                     }
569                     // Check isDone() in advance, try to avoid calculating the elapsed time later.
570                     if (isDone()) {
571                         return true;
572                     }
573                     // Calculate the elapsed time here instead of in the while condition,
574                     // try to avoid performance cost of System.nanoTime() in the first loop of while.
575                     waitTime = timeoutNanos - (System.nanoTime() - startTime);
576                 }
577                 return isDone();
578             } finally {
579                 if (interrupted) {
580                     Thread.currentThread().interrupt();
581                 }
582             }
583         }
584     }
585 
586     private static boolean isCancelled0(Object result) {
587         return result instanceof CauseHolder && ((CauseHolder) result).cause instanceof CancellationException;
588     }
589 
590     private static boolean isDone0(Object result) {
591         return result != null && result != UNCANCELLABLE;
592     }
593 
594     private static final class CauseHolder {
595         final Throwable cause;
596         CauseHolder(Throwable cause) {
597             this.cause = cause;
598         }
599     }
600 
601     static void safeExecute(EventExecutor executor, Runnable task) {
602         try {
603             executor.execute(task);
604         } catch (Throwable t) {
605             rejectedExecutionLogger.error("Failed to submit a listener notification task. Event loop shut down?", t);
606         }
607     }
608 
609     @Override
610     public FutureCompletionStage<V> asStage() {
611         return this;
612     }
613 
614     // <editor-fold defaultstate="collapsed" desc="CompletionStage and JDK Future implementation.">
615     private enum Marker {
616         EMPTY,
617         ERROR
618     }
619 
620     // Just a marker
621     private static final Executor SAME_AS_FUTURE = task -> {
622         throw new UnsupportedOperationException("Marker executor. Should never be called!");
623     };
624 
625     @Override
626     public boolean cancel(boolean mayInterruptIfRunning) {
627         return cancel();
628     }
629 
630     @Override
631     public Future<V> future() {
632         return this;
633     }
634 
635     @Override
636     public <U> FutureCompletionStage<U> thenApply(Function<? super V, ? extends U> fn) {
637         return thenApplyAsync(fn, SAME_AS_FUTURE);
638     }
639 
640     @Override
641     public <U> FutureCompletionStage<U> thenApplyAsync(Function<? super V, ? extends U> fn) {
642         return thenApplyAsync(fn, ForkJoinPool.commonPool());
643     }
644 
645     @Override
646     public FutureCompletionStage<Void> thenAccept(Consumer<? super V> action) {
647         return thenAcceptAsync(action, SAME_AS_FUTURE);
648     }
649 
650     @Override
651     public FutureCompletionStage<Void> thenAcceptAsync(Consumer<? super V> action) {
652         return thenAcceptAsync(action, ForkJoinPool.commonPool());
653     }
654 
655     @Override
656     public FutureCompletionStage<Void> thenRun(Runnable action) {
657         return thenRunAsync(action, SAME_AS_FUTURE);
658     }
659 
660     @Override
661     public FutureCompletionStage<Void> thenRunAsync(Runnable action) {
662         return thenRunAsync(action, ForkJoinPool.commonPool());
663     }
664 
665     @Override
666     public  <U, V1> FutureCompletionStage<V1> thenCombine(
667             CompletionStage<? extends U> other, BiFunction<? super V, ? super U, ? extends V1> fn) {
668         return thenCombineAsync(other, fn, SAME_AS_FUTURE);
669     }
670 
671     @Override
672     public <U, V1> FutureCompletionStage<V1> thenCombineAsync(
673             CompletionStage<? extends U> other, BiFunction<? super V, ? super U, ? extends V1> fn) {
674         return thenCombineAsync(other, fn, ForkJoinPool.commonPool());
675     }
676 
677     @Override
678     public  <U> FutureCompletionStage<Void> thenAcceptBoth(
679             CompletionStage<? extends U> other, BiConsumer<? super V, ? super U> action) {
680         return thenAcceptBothAsync(other, action, SAME_AS_FUTURE);
681     }
682 
683     @Override
684     public <U> FutureCompletionStage<Void> thenAcceptBothAsync(
685             CompletionStage<? extends U> other, BiConsumer<? super V, ? super U> action) {
686         return thenAcceptBothAsync(other, action, ForkJoinPool.commonPool());
687     }
688 
689     @Override
690     public FutureCompletionStage<Void> runAfterBoth(CompletionStage<?> other, Runnable action) {
691         return runAfterBothAsync(other, action, SAME_AS_FUTURE);
692     }
693 
694     @Override
695     public FutureCompletionStage<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action) {
696         return runAfterBothAsync(other, action, ForkJoinPool.commonPool());
697     }
698 
699     @Override
700     public  <U> FutureCompletionStage<U> applyToEither(
701             CompletionStage<? extends V> other, Function<? super V, U> fn) {
702         return applyToEitherAsync(other, fn, SAME_AS_FUTURE);
703     }
704 
705     @Override
706     public <U> FutureCompletionStage<U> applyToEitherAsync(
707             CompletionStage<? extends V> other, Function<? super V, U> fn) {
708         return applyToEitherAsync(other, fn, ForkJoinPool.commonPool());
709     }
710 
711     @Override
712     public FutureCompletionStage<Void> acceptEither(CompletionStage<? extends V> other, Consumer<? super V> action) {
713         return acceptEitherAsync(other, action, SAME_AS_FUTURE);
714     }
715 
716     @Override
717     public FutureCompletionStage<Void> acceptEitherAsync(
718             CompletionStage<? extends V> other, Consumer<? super V> action) {
719         return acceptEitherAsync(other, action, ForkJoinPool.commonPool());
720     }
721 
722     @Override
723     public FutureCompletionStage<Void> runAfterEither(CompletionStage<?> other, Runnable action) {
724         return runAfterEitherAsync(other, action, SAME_AS_FUTURE);
725     }
726 
727     @Override
728     public FutureCompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action) {
729         return runAfterEitherAsync(other, action, ForkJoinPool.commonPool());
730     }
731 
732     @Override
733     public <U> FutureCompletionStage<U> thenCompose(Function<? super V, ? extends CompletionStage<U>> fn) {
734         return thenComposeAsync(fn, SAME_AS_FUTURE);
735     }
736 
737     @Override
738     public <U> FutureCompletionStage<U> thenComposeAsync(Function<? super V, ? extends CompletionStage<U>> fn) {
739         return thenComposeAsync(fn, ForkJoinPool.commonPool());
740     }
741 
742     @Override
743     public FutureCompletionStage<V> whenComplete(BiConsumer<? super V, ? super Throwable> action) {
744         return whenCompleteAsync(action, SAME_AS_FUTURE);
745     }
746 
747     @Override
748     public FutureCompletionStage<V> whenCompleteAsync(BiConsumer<? super V, ? super Throwable> action) {
749         return whenCompleteAsync(action, ForkJoinPool.commonPool());
750     }
751 
752     @Override
753     public <U> FutureCompletionStage<U> handle(BiFunction<? super V, Throwable, ? extends U> fn) {
754         return handleAsync(fn, SAME_AS_FUTURE);
755     }
756 
757     @Override
758     public <U> FutureCompletionStage<U> handleAsync(BiFunction<? super V, Throwable, ? extends U> fn) {
759         return handleAsync(fn, ForkJoinPool.commonPool());
760     }
761 
762     @Override
763     public <U> FutureCompletionStage<U> thenApplyAsync(Function<? super V, ? extends U> fn, Executor executor) {
764         requireNonNull(fn, "fn");
765         requireNonNull(executor, "executor");
766 
767         Promise<U> promise = executor().newPromise();
768         addListener(future -> {
769             Throwable cause = future.cause();
770             if (cause == null) {
771                 V value = future.getNow();
772                 if (executeDirectly(executor)) {
773                     thenApplyAsync0(promise, value, fn);
774                 } else {
775                     safeExecute(executor, () -> thenApplyAsync0(promise, value, fn), promise);
776                 }
777             } else {
778                 promise.setFailure(cause);
779             }
780         });
781         return promise.asFuture().asStage();
782     }
783 
784     private static <U, V> void thenApplyAsync0(Promise<U> promise, V value, Function<? super V, ? extends U> fn) {
785         final U result;
786         try {
787             result = fn.apply(value);
788         } catch (Throwable cause) {
789             promise.setFailure(cause);
790             return;
791         }
792         promise.setSuccess(result);
793     }
794 
795     @Override
796     public FutureCompletionStage<Void> thenAcceptAsync(Consumer<? super V> action, Executor executor) {
797         requireNonNull(action, "action");
798         requireNonNull(executor, "executor");
799 
800         Promise<Void> promise = executor().newPromise();
801         addListener(future -> {
802             Throwable cause = future.cause();
803             if (cause == null) {
804                 V value = future.getNow();
805                 if (executeDirectly(executor)) {
806                     thenAcceptAsync0(promise, value, action);
807                 } else {
808                     safeExecute(executor, () -> thenAcceptAsync0(promise, value, action), promise);
809                 }
810             } else {
811                 promise.setFailure(cause);
812             }
813         });
814         return promise.asFuture().asStage();
815     }
816 
817     private static <U, V> void thenAcceptAsync0(Promise<U> promise, V value, Consumer<? super V> action) {
818         try {
819             action.accept(value);
820             promise.setSuccess(null);
821         } catch (Throwable cause) {
822             promise.setFailure(cause);
823         }
824     }
825 
826     @Override
827     public FutureCompletionStage<Void> thenRunAsync(Runnable action, Executor executor) {
828         return thenAcceptAsync(ignore -> action.run(), executor);
829     }
830 
831     @Override
832     public <U, V1> FutureCompletionStage<V1> thenCombineAsync(
833             CompletionStage<? extends U> other, BiFunction<? super V, ? super U, ? extends V1> fn, Executor executor) {
834         requireNonNull(other, "other");
835         requireNonNull(fn, "fn");
836         requireNonNull(executor, "executor");
837 
838         Promise<V1> promise = executor().newPromise();
839         AtomicReference<Object> reference = new AtomicReference<>(Marker.EMPTY);
840 
841         abstract class CombineBiConsumer<T1, T2, T> implements BiConsumer<T, Throwable> {
842             @SuppressWarnings("unchecked")
843             @Override
844             public void accept(T v, Throwable error) {
845                 if (error == null) {
846                     if (!reference.compareAndSet(Marker.EMPTY, v)) {
847                         Object rawValue = reference.get();
848                         if (rawValue == Marker.ERROR) {
849                             return;
850                         }
851                         applyAndNotify0(promise, (T1) v, (T2) rawValue, fn);
852                     }
853                 } else {
854                     if (reference.getAndSet(Marker.ERROR) != Marker.ERROR) {
855                         // Did not fail the promise before, do it now.
856                         promise.setFailure(error);
857                     }
858                 }
859             }
860 
861             abstract void applyAndNotify0(
862                     Promise<V1> promise, T1 value1, T2 value2, BiFunction<? super V, ? super U, ? extends V1> fn);
863         }
864 
865         whenCompleteAsync(new CombineBiConsumer<V, U, V>() {
866             @Override
867             void applyAndNotify0(
868                     Promise<V1> promise, V value1, U value2, BiFunction<? super V, ? super U, ? extends V1> fn) {
869                 applyAndNotify(promise, value1, value2, fn);
870             }
871         }, executor);
872         other.whenCompleteAsync(new CombineBiConsumer<U, V, U>() {
873             @Override
874             void applyAndNotify0(
875                     Promise<V1> promise, U value1, V value2, BiFunction<? super V, ? super U, ? extends V1> fn) {
876                 applyAndNotify(promise, value2, value1, fn);
877             }
878         }, otherExecutor(executor));
879         return promise.asFuture().asStage();
880     }
881 
882     private Executor otherExecutor(Executor executor) {
883         return executor == SAME_AS_FUTURE ? executor() : executor;
884     }
885 
886     @Override
887     public <U> FutureCompletionStage<Void> thenAcceptBothAsync(
888             CompletionStage<? extends U> other, BiConsumer<? super V, ? super U> action, Executor executor) {
889         requireNonNull(action, "action");
890         return thenCombineAsync(other, (value, value2) -> {
891             action.accept(value, value2);
892             return null;
893         }, executor);
894     }
895 
896     @Override
897     public FutureCompletionStage<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action, Executor executor) {
898         requireNonNull(action, "action");
899         return thenCombineAsync(other, (ignoreOtherValue, ignoreError) -> {
900             action.run();
901             return null;
902         }, executor);
903     }
904 
905     @Override
906     public <U> FutureCompletionStage<U> applyToEitherAsync(
907             CompletionStage<? extends V> other, Function<? super V, U> fn, Executor executor) {
908         requireNonNull(other, "other");
909         requireNonNull(fn, "fn");
910 
911         Promise<U> promise = executor().newPromise();
912         BiConsumer<V, Throwable> consumer = new AtomicBiConsumer<>(promise) {
913             private static final long serialVersionUID = -8454630185124276599L;
914 
915             @Override
916             protected U apply(V value) {
917                 return fn.apply(value);
918             }
919         };
920         whenCompleteAsync(consumer, executor);
921         other.whenCompleteAsync(consumer, otherExecutor(executor));
922         return promise.asFuture().asStage();
923     }
924 
925     @Override
926     public FutureCompletionStage<Void> acceptEitherAsync(
927             CompletionStage<? extends V> other, Consumer<? super V> action, Executor executor) {
928         requireNonNull(other, "other");
929         requireNonNull(action, "action");
930 
931         Promise<Void> promise = executor().newPromise();
932         BiConsumer<V, Throwable> consumer = new AtomicBiConsumer<>(promise) {
933             private static final long serialVersionUID = -8429618092318150682L;
934 
935             @Override
936             protected Void apply(V value) {
937                 action.accept(value);
938                 return null;
939             }
940         };
941         whenCompleteAsync(consumer, executor);
942         other.whenCompleteAsync(consumer, otherExecutor(executor));
943         return promise.asFuture().asStage();
944     }
945 
946     @Override
947     public FutureCompletionStage<Void> runAfterEitherAsync(
948             CompletionStage<?> other, Runnable action, Executor executor) {
949         requireNonNull(other, "other");
950         requireNonNull(action, "action");
951 
952         Promise<Void> promise = executor().newPromise();
953         BiConsumer<Object, Throwable> consumer = new AtomicBiConsumer<>(promise) {
954             private static final long serialVersionUID = 5994110691767731494L;
955 
956             @Override
957             protected Void apply(Object value) {
958                 action.run();
959                 return null;
960             }
961         };
962         whenCompleteAsync(consumer, executor);
963         other.whenCompleteAsync(consumer, otherExecutor(executor));
964         return promise.asFuture().asStage();
965     }
966 
967     @Override
968     public <U> FutureCompletionStage<U> thenComposeAsync(
969             Function<? super V, ? extends CompletionStage<U>> fn, Executor executor) {
970         requireNonNull(fn, "fn");
971         requireNonNull(executor, "executor");
972 
973         Promise<U> promise = executor().newPromise();
974         addListener(f -> {
975             Throwable cause = f.cause();
976             if (cause == null) {
977                 V value = f.getNow();
978                 if (executeDirectly(executor)) {
979                     thenComposeAsync0(promise, fn, value);
980                 } else {
981                     safeExecute(executor, () -> thenComposeAsync0(promise, fn, value), promise);
982                 }
983             } else {
984                 promise.setFailure(cause);
985             }
986         });
987         return promise.asFuture().asStage();
988     }
989 
990     private static <V, U> void thenComposeAsync0(
991             Promise<U> promise, Function<? super V, ? extends CompletionStage<U>> fn, V value) {
992         final CompletionStage<U> result;
993         try {
994             result = fn.apply(value);
995         } catch (Throwable cause) {
996             promise.setFailure(cause);
997             return;
998         }
999         result.whenComplete((v, error) -> {
1000             if (error == null) {
1001                 promise.setSuccess(v);
1002             } else {
1003                 promise.setFailure(error);
1004             }
1005         });
1006     }
1007 
1008     @Override
1009     public FutureCompletionStage<V> exceptionally(Function<Throwable, ? extends V> fn) {
1010         requireNonNull(fn, "fn");
1011 
1012         Promise<V> promise = executor().newPromise();
1013         addListener(f -> {
1014             Throwable error = f.cause();
1015             if (error == null) {
1016                 V value = f.getNow();
1017                 promise.setSuccess(value);
1018             } else {
1019                 final V result;
1020                 try {
1021                     result = fn.apply(error);
1022                 } catch (Throwable cause) {
1023                     promise.setFailure(cause);
1024                     return;
1025                 }
1026                 promise.setSuccess(result);
1027             }
1028         });
1029         return promise.asFuture().asStage();
1030     }
1031 
1032     @Override
1033     public FutureCompletionStage<V> whenCompleteAsync(
1034             BiConsumer<? super V, ? super Throwable> action, Executor executor) {
1035         requireNonNull(action, "action");
1036         requireNonNull(executor, "executor");
1037 
1038         Promise<V> promise = executor().newPromise();
1039         addListener(f -> {
1040             if (executeDirectly(executor)) {
1041                 whenCompleteAsync0(promise, f, action);
1042             } else {
1043                 safeExecute(executor, () -> whenCompleteAsync0(promise, f, action), promise);
1044             }
1045         });
1046         return promise.asFuture().asStage();
1047     }
1048 
1049     private static <V> void whenCompleteAsync0(
1050             Promise<V> promise, Future<? extends V> f, BiConsumer<? super V, ? super Throwable> action) {
1051         Throwable cause = f.cause();
1052         V value = cause == null ? f.getNow() : null;
1053         try {
1054             action.accept(value, cause);
1055         } catch (Throwable error) {
1056             promise.setFailure(error);
1057             return;
1058         }
1059 
1060         if (cause == null) {
1061             promise.setSuccess(value);
1062         } else {
1063             promise.setFailure(cause);
1064         }
1065     }
1066 
1067     @Override
1068     public <U> FutureCompletionStage<U> handleAsync(
1069             BiFunction<? super V, Throwable, ? extends U> fn, Executor executor) {
1070         requireNonNull(fn, "fn");
1071         requireNonNull(executor, "executor");
1072 
1073         Promise<U> promise = executor().newPromise();
1074         addListener(f -> {
1075             if (executeDirectly(executor)) {
1076                 handleAsync0(promise, f, fn);
1077             } else {
1078                 safeExecute(executor, () -> handleAsync0(promise, f, fn), promise);
1079             }
1080         });
1081         return promise.asFuture().asStage();
1082     }
1083 
1084     @SuppressWarnings("unchecked")
1085     private static <U, V> void handleAsync0(
1086             Promise<U> promise, Future<? super V> f, BiFunction<? super V, Throwable, ? extends U> fn) {
1087         Throwable cause = f.cause();
1088         applyAndNotify(promise, cause == null ? (V) f.getNow() : null, cause, fn);
1089     }
1090 
1091     private static <U, V, T> void applyAndNotify(
1092             Promise<U> promise, V value, T value2, BiFunction<? super V, ? super T, ? extends U> fn) {
1093         final U result;
1094         try {
1095             result = fn.apply(value, value2);
1096         } catch (Throwable error) {
1097             promise.setFailure(error);
1098             return;
1099         }
1100         promise.setSuccess(result);
1101     }
1102 
1103     private static boolean executeDirectly(Executor executor) {
1104         return executor == SAME_AS_FUTURE;
1105     }
1106 
1107     private static void safeExecute(Executor executor, Runnable task, Promise<?> promise) {
1108         try {
1109             executor.execute(task);
1110         } catch (Throwable cause) {
1111             promise.setFailure(cause);
1112         }
1113     }
1114 
1115     private abstract static class AtomicBiConsumer<V, U> extends AtomicReference<Object>
1116             implements BiConsumer<V, Throwable> {
1117         private static final long serialVersionUID = 880039612531973027L;
1118 
1119         private final Promise<U> promise;
1120 
1121         AtomicBiConsumer(Promise<U> promise) {
1122             super(Marker.EMPTY);
1123             this.promise = promise;
1124         }
1125 
1126         @Override
1127         public void accept(V v, Throwable error) {
1128             if (error == null) {
1129                 if (compareAndSet(Marker.EMPTY, v)) {
1130                     final U value;
1131                     try {
1132                         value = apply(v);
1133                     } catch (Throwable cause) {
1134                         promise.setFailure(cause);
1135                         return;
1136                     }
1137                     promise.setSuccess(value);
1138                 }
1139             } else if (compareAndSet(Marker.EMPTY, Marker.ERROR)) {
1140                 promise.setFailure(error);
1141             }
1142         }
1143 
1144         protected abstract U apply(V value);
1145     }
1146     // </editor-fold>
1147 
1148     private static final class StacklessCancellationException extends CancellationException {
1149 
1150         private static final long serialVersionUID = -2974906711413716191L;
1151 
1152         // Override fillInStackTrace() so we not populate the backtrace via a native call and so leak the
1153         // Classloader.
1154         @Override
1155         public Throwable fillInStackTrace() {
1156             return this;
1157         }
1158 
1159         static StacklessCancellationException newInstance(Class<?> clazz, String method) {
1160             return ThrowableUtil.unknownStackTrace(new StacklessCancellationException(), clazz, method);
1161         }
1162     }
1163 }