View Javadoc
1   /*
2    * Copyright 2013 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.util.concurrent;
17  
18  import io.netty.util.internal.InternalThreadLocalMap;
19  import io.netty.util.internal.PlatformDependent;
20  import io.netty.util.internal.StringUtil;
21  import io.netty.util.internal.SystemPropertyUtil;
22  import io.netty.util.internal.ThrowableUtil;
23  import io.netty.util.internal.logging.InternalLogger;
24  import io.netty.util.internal.logging.InternalLoggerFactory;
25  
26  import java.util.Locale;
27  import java.util.concurrent.CancellationException;
28  import java.util.concurrent.CompletionException;
29  import java.util.concurrent.ExecutionException;
30  import java.util.concurrent.TimeUnit;
31  import java.util.concurrent.TimeoutException;
32  import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
33  
34  import static io.netty.util.internal.ObjectUtil.checkNotNull;
35  import static java.util.concurrent.TimeUnit.MILLISECONDS;
36  
37  public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
38      /**
39       * System property with integer type value, that determine the max reentrancy/recursion level for when
40       * listener notifications prompt other listeners to be notified.
41       * <p>
42       * When the reentrancy/recursion level becomes greater than this number, a new task will instead be scheduled
43       * on the event loop, to finish notifying any subsequent listners.
44       * <p>
45       * The default value is {@code 8}.
46       */
47      public static final String PROPERTY_MAX_LISTENER_STACK_DEPTH = "io.netty.defaultPromise.maxListenerStackDepth";
48  
49      private static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultPromise.class);
50      private static final InternalLogger rejectedExecutionLogger =
51              InternalLoggerFactory.getInstance(DefaultPromise.class.getName() + ".rejectedExecution");
52      private static final int MAX_LISTENER_STACK_DEPTH = Math.min(8,
53              SystemPropertyUtil.getInt(PROPERTY_MAX_LISTENER_STACK_DEPTH, 8));
54      @SuppressWarnings("rawtypes")
55      private static final AtomicReferenceFieldUpdater<DefaultPromise, Object> RESULT_UPDATER =
56              AtomicReferenceFieldUpdater.newUpdater(DefaultPromise.class, Object.class, "result");
57      private static final Object SUCCESS = new Object();
58      private static final Object UNCANCELLABLE = new Object();
59      private static final CauseHolder CANCELLATION_CAUSE_HOLDER = new CauseHolder(
60              StacklessCancellationException.newInstance(DefaultPromise.class, "cancel(...)"));
61      private static final StackTraceElement[] CANCELLATION_STACK = CANCELLATION_CAUSE_HOLDER.cause.getStackTrace();
62  
63      private volatile Object result;
64      private final EventExecutor executor;
65  
66      /**
67       * One or more listeners. Can be a {@link GenericFutureListener} or a {@link DefaultFutureListeners}.
68       * If {@code null}, it means either 1) no listeners were added yet or 2) all listeners were notified.
69       * <p>
70       * Threading - synchronized(this). We must support adding listeners when there is no EventExecutor.
71       */
72      private GenericFutureListener<? extends Future<?>> listener;
73      private DefaultFutureListeners listeners;
74      /**
75       * Threading - synchronized(this). We are required to hold the monitor to use Java's underlying wait()/notifyAll().
76       */
77      private short waiters;
78  
79      /**
80       * Threading - synchronized(this). We must prevent concurrent notification and FIFO listener notification if the
81       * executor changes.
82       */
83      private boolean notifyingListeners;
84  
85      /**
86       * Creates a new instance.
87       * <p>
88       * It is preferable to use {@link EventExecutor#newPromise()} to create a new promise
89       *
90       * @param executor
91       *        the {@link EventExecutor} which is used to notify the promise once it is complete.
92       *        It is assumed this executor will protect against {@link StackOverflowError} exceptions.
93       *        The executor may be used to avoid {@link StackOverflowError} by executing a {@link Runnable} if the stack
94       *        depth exceeds a threshold.
95       *
96       */
97      public DefaultPromise(EventExecutor executor) {
98          this.executor = checkNotNull(executor, "executor");
99      }
100 
101     /**
102      * See {@link #executor()} for expectations of the executor.
103      */
104     protected DefaultPromise() {
105         // only for subclasses
106         executor = null;
107     }
108 
109     @Override
110     public Promise<V> setSuccess(V result) {
111         if (setSuccess0(result)) {
112             return this;
113         }
114         throw new IllegalStateException("complete already: " + this);
115     }
116 
117     @Override
118     public boolean trySuccess(V result) {
119         return setSuccess0(result);
120     }
121 
122     @Override
123     public Promise<V> setFailure(Throwable cause) {
124         if (setFailure0(cause)) {
125             return this;
126         }
127         throw new IllegalStateException("complete already: " + this, cause);
128     }
129 
130     @Override
131     public boolean tryFailure(Throwable cause) {
132         return setFailure0(cause);
133     }
134 
135     @Override
136     public boolean setUncancellable() {
137         if (RESULT_UPDATER.compareAndSet(this, null, UNCANCELLABLE)) {
138             return true;
139         }
140         Object result = this.result;
141         return !isDone0(result) || !isCancelled0(result);
142     }
143 
144     @Override
145     public boolean isSuccess() {
146         Object result = this.result;
147         return result != null && result != UNCANCELLABLE && !(result instanceof CauseHolder);
148     }
149 
150     @Override
151     public boolean isCancellable() {
152         return result == null;
153     }
154 
155     private static final class LeanCancellationException extends CancellationException {
156         private static final long serialVersionUID = 2794674970981187807L;
157 
158         // Suppress a warning since the method doesn't need synchronization
159         @Override
160         public Throwable fillInStackTrace() {
161             setStackTrace(CANCELLATION_STACK);
162             return this;
163         }
164 
165         @Override
166         public String toString() {
167             return CancellationException.class.getName();
168         }
169     }
170 
171     @Override
172     public Throwable cause() {
173         return cause0(result);
174     }
175 
176     private Throwable cause0(Object result) {
177         if (!(result instanceof CauseHolder)) {
178             return null;
179         }
180         if (result == CANCELLATION_CAUSE_HOLDER) {
181             CancellationException ce = new LeanCancellationException();
182             if (RESULT_UPDATER.compareAndSet(this, CANCELLATION_CAUSE_HOLDER, new CauseHolder(ce))) {
183                 return ce;
184             }
185             result = this.result;
186         }
187         return ((CauseHolder) result).cause;
188     }
189 
190     @Override
191     public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
192         checkNotNull(listener, "listener");
193 
194         synchronized (this) {
195             addListener0(listener);
196         }
197 
198         if (isDone()) {
199             notifyListeners();
200         }
201 
202         return this;
203     }
204 
205     @Override
206     public Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners) {
207         checkNotNull(listeners, "listeners");
208 
209         synchronized (this) {
210             for (GenericFutureListener<? extends Future<? super V>> listener : listeners) {
211                 if (listener == null) {
212                     break;
213                 }
214                 addListener0(listener);
215             }
216         }
217 
218         if (isDone()) {
219             notifyListeners();
220         }
221 
222         return this;
223     }
224 
225     @Override
226     public Promise<V> removeListener(final GenericFutureListener<? extends Future<? super V>> listener) {
227         checkNotNull(listener, "listener");
228 
229         synchronized (this) {
230             removeListener0(listener);
231         }
232 
233         return this;
234     }
235 
236     @Override
237     public Promise<V> removeListeners(final GenericFutureListener<? extends Future<? super V>>... listeners) {
238         checkNotNull(listeners, "listeners");
239 
240         synchronized (this) {
241             for (GenericFutureListener<? extends Future<? super V>> listener : listeners) {
242                 if (listener == null) {
243                     break;
244                 }
245                 removeListener0(listener);
246             }
247         }
248 
249         return this;
250     }
251 
252     @Override
253     public Promise<V> await() throws InterruptedException {
254         if (isDone()) {
255             return this;
256         }
257 
258         if (Thread.interrupted()) {
259             throw new InterruptedException(toString());
260         }
261 
262         checkDeadLock();
263 
264         synchronized (this) {
265             while (!isDone()) {
266                 incWaiters();
267                 try {
268                     wait();
269                 } finally {
270                     decWaiters();
271                 }
272             }
273         }
274         return this;
275     }
276 
277     @Override
278     public Promise<V> awaitUninterruptibly() {
279         if (isDone()) {
280             return this;
281         }
282 
283         checkDeadLock();
284 
285         boolean interrupted = false;
286         synchronized (this) {
287             while (!isDone()) {
288                 incWaiters();
289                 try {
290                     wait();
291                 } catch (InterruptedException e) {
292                     // Interrupted while waiting.
293                     interrupted = true;
294                 } finally {
295                     decWaiters();
296                 }
297             }
298         }
299 
300         if (interrupted) {
301             Thread.currentThread().interrupt();
302         }
303 
304         return this;
305     }
306 
307     @Override
308     public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
309         return await0(unit.toNanos(timeout), true);
310     }
311 
312     @Override
313     public boolean await(long timeoutMillis) throws InterruptedException {
314         return await0(MILLISECONDS.toNanos(timeoutMillis), true);
315     }
316 
317     @Override
318     public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {
319         try {
320             return await0(unit.toNanos(timeout), false);
321         } catch (InterruptedException e) {
322             // Should not be raised at all.
323             throw new InternalError();
324         }
325     }
326 
327     @Override
328     public boolean awaitUninterruptibly(long timeoutMillis) {
329         try {
330             return await0(MILLISECONDS.toNanos(timeoutMillis), false);
331         } catch (InterruptedException e) {
332             // Should not be raised at all.
333             throw new InternalError();
334         }
335     }
336 
337     @SuppressWarnings("unchecked")
338     @Override
339     public V getNow() {
340         Object result = this.result;
341         if (result instanceof CauseHolder || result == SUCCESS || result == UNCANCELLABLE) {
342             return null;
343         }
344         return (V) result;
345     }
346 
347     @SuppressWarnings("unchecked")
348     @Override
349     public V get() throws InterruptedException, ExecutionException {
350         Object result = this.result;
351         if (!isDone0(result)) {
352             await();
353             result = this.result;
354         }
355         if (result == SUCCESS || result == UNCANCELLABLE) {
356             return null;
357         }
358         Throwable cause = cause0(result);
359         if (cause == null) {
360             return (V) result;
361         }
362         if (cause instanceof CancellationException) {
363             throw (CancellationException) cause;
364         }
365         throw new ExecutionException(cause);
366     }
367 
368     @SuppressWarnings("unchecked")
369     @Override
370     public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
371         Object result = this.result;
372         if (!isDone0(result)) {
373             if (!await(timeout, unit)) {
374                 throw new TimeoutException("timeout after " + timeout + " " + unit.name().toLowerCase(Locale.ENGLISH));
375             }
376             result = this.result;
377         }
378         if (result == SUCCESS || result == UNCANCELLABLE) {
379             return null;
380         }
381         Throwable cause = cause0(result);
382         if (cause == null) {
383             return (V) result;
384         }
385         if (cause instanceof CancellationException) {
386             throw (CancellationException) cause;
387         }
388         throw new ExecutionException(cause);
389     }
390 
391     /**
392      * {@inheritDoc}
393      *
394      * @param mayInterruptIfRunning this value has no effect in this implementation.
395      */
396     @Override
397     public boolean cancel(boolean mayInterruptIfRunning) {
398         if (RESULT_UPDATER.compareAndSet(this, null, CANCELLATION_CAUSE_HOLDER)) {
399             if (checkNotifyWaiters()) {
400                 notifyListeners();
401             }
402             return true;
403         }
404         return false;
405     }
406 
407     @Override
408     public boolean isCancelled() {
409         return isCancelled0(result);
410     }
411 
412     @Override
413     public boolean isDone() {
414         return isDone0(result);
415     }
416 
417     @Override
418     public Promise<V> sync() throws InterruptedException {
419         await();
420         rethrowIfFailed();
421         return this;
422     }
423 
424     @Override
425     public Promise<V> syncUninterruptibly() {
426         awaitUninterruptibly();
427         rethrowIfFailed();
428         return this;
429     }
430 
431     @Override
432     public String toString() {
433         return toStringBuilder().toString();
434     }
435 
436     protected StringBuilder toStringBuilder() {
437         StringBuilder buf = new StringBuilder(64)
438                 .append(StringUtil.simpleClassName(this))
439                 .append('@')
440                 .append(Integer.toHexString(hashCode()));
441 
442         Object result = this.result;
443         if (result == SUCCESS) {
444             buf.append("(success)");
445         } else if (result == UNCANCELLABLE) {
446             buf.append("(uncancellable)");
447         } else if (result instanceof CauseHolder) {
448             buf.append("(failure: ")
449                     .append(((CauseHolder) result).cause)
450                     .append(')');
451         } else if (result != null) {
452             buf.append("(success: ")
453                     .append(result)
454                     .append(')');
455         } else {
456             buf.append("(incomplete)");
457         }
458 
459         return buf;
460     }
461 
462     /**
463      * Get the executor used to notify listeners when this promise is complete.
464      * <p>
465      * It is assumed this executor will protect against {@link StackOverflowError} exceptions.
466      * The executor may be used to avoid {@link StackOverflowError} by executing a {@link Runnable} if the stack
467      * depth exceeds a threshold.
468      * @return The executor used to notify listeners when this promise is complete.
469      */
470     protected EventExecutor executor() {
471         return executor;
472     }
473 
474     protected void checkDeadLock() {
475         EventExecutor e = executor();
476         if (e != null && e.inEventLoop()) {
477             throw new BlockingOperationException(toString());
478         }
479     }
480 
481     /**
482      * Notify a listener that a future has completed.
483      * <p>
484      * This method has a fixed depth of {@link #MAX_LISTENER_STACK_DEPTH} that will limit recursion to prevent
485      * {@link StackOverflowError} and will stop notifying listeners added after this threshold is exceeded.
486      * @param eventExecutor the executor to use to notify the listener {@code listener}.
487      * @param future the future that is complete.
488      * @param listener the listener to notify.
489      */
490     protected static void notifyListener(
491             EventExecutor eventExecutor, final Future<?> future, final GenericFutureListener<?> listener) {
492         notifyListenerWithStackOverFlowProtection(
493                 checkNotNull(eventExecutor, "eventExecutor"),
494                 checkNotNull(future, "future"),
495                 checkNotNull(listener, "listener"));
496     }
497 
498     private void notifyListeners() {
499         EventExecutor executor = executor();
500         if (executor.inEventLoop()) {
501             final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
502             final int stackDepth = threadLocals.futureListenerStackDepth();
503             if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
504                 threadLocals.setFutureListenerStackDepth(stackDepth + 1);
505                 try {
506                     notifyListenersNow();
507                 } finally {
508                     threadLocals.setFutureListenerStackDepth(stackDepth);
509                 }
510                 return;
511             }
512         }
513 
514         safeExecute(executor, new Runnable() {
515             @Override
516             public void run() {
517                 notifyListenersNow();
518             }
519         });
520     }
521 
522     /**
523      * The logic in this method should be identical to {@link #notifyListeners()} but
524      * cannot share code because the listener(s) cannot be cached for an instance of {@link DefaultPromise} since the
525      * listener(s) may be changed and is protected by a synchronized operation.
526      */
527     private static void notifyListenerWithStackOverFlowProtection(final EventExecutor executor,
528                                                                   final Future<?> future,
529                                                                   final GenericFutureListener<?> listener) {
530         if (executor.inEventLoop()) {
531             final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
532             final int stackDepth = threadLocals.futureListenerStackDepth();
533             if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
534                 threadLocals.setFutureListenerStackDepth(stackDepth + 1);
535                 try {
536                     notifyListener0(future, listener);
537                 } finally {
538                     threadLocals.setFutureListenerStackDepth(stackDepth);
539                 }
540                 return;
541             }
542         }
543 
544         safeExecute(executor, new Runnable() {
545             @Override
546             public void run() {
547                 notifyListener0(future, listener);
548             }
549         });
550     }
551 
552     private void notifyListenersNow() {
553         GenericFutureListener listener;
554         DefaultFutureListeners listeners;
555         synchronized (this) {
556             listener = this.listener;
557             listeners = this.listeners;
558             // Only proceed if there are listeners to notify and we are not already notifying listeners.
559             if (notifyingListeners || (listener == null && listeners == null)) {
560                 return;
561             }
562             notifyingListeners = true;
563             if (listener != null) {
564                 this.listener = null;
565             } else {
566                 this.listeners = null;
567             }
568         }
569         for (;;) {
570             if (listener != null) {
571                 notifyListener0(this, listener);
572             } else {
573                 notifyListeners0(listeners);
574             }
575             synchronized (this) {
576                 if (this.listener == null && this.listeners == null) {
577                     // Nothing can throw from within this method, so setting notifyingListeners back to false does not
578                     // need to be in a finally block.
579                     notifyingListeners = false;
580                     return;
581                 }
582                 listener = this.listener;
583                 listeners = this.listeners;
584                 if (listener != null) {
585                     this.listener = null;
586                 } else {
587                     this.listeners = null;
588                 }
589             }
590         }
591     }
592 
593     private void notifyListeners0(DefaultFutureListeners listeners) {
594         GenericFutureListener<?>[] a = listeners.listeners();
595         int size = listeners.size();
596         for (int i = 0; i < size; i ++) {
597             notifyListener0(this, a[i]);
598         }
599     }
600 
601     @SuppressWarnings({ "unchecked", "rawtypes" })
602     private static void notifyListener0(Future future, GenericFutureListener l) {
603         try {
604             l.operationComplete(future);
605         } catch (Throwable t) {
606             if (logger.isWarnEnabled()) {
607                 logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t);
608             }
609         }
610     }
611 
612     private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) {
613         if (this.listener == null) {
614             if (listeners == null) {
615                 this.listener = listener;
616             } else {
617                 listeners.add(listener);
618             }
619         } else {
620             assert listeners == null;
621             listeners = new DefaultFutureListeners(this.listener, listener);
622             this.listener = null;
623         }
624     }
625 
626     private void removeListener0(GenericFutureListener<? extends Future<? super V>> toRemove) {
627         if (listener == toRemove) {
628             listener = null;
629         } else if (listeners != null) {
630             listeners.remove(toRemove);
631             // Removal is rare, no need for compaction
632             if (listeners.size() == 0) {
633                 listeners = null;
634             }
635         }
636     }
637 
638     private boolean setSuccess0(V result) {
639         return setValue0(result == null ? SUCCESS : result);
640     }
641 
642     private boolean setFailure0(Throwable cause) {
643         return setValue0(new CauseHolder(checkNotNull(cause, "cause")));
644     }
645 
646     private boolean setValue0(Object objResult) {
647         if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
648             RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
649             if (checkNotifyWaiters()) {
650                 notifyListeners();
651             }
652             return true;
653         }
654         return false;
655     }
656 
657     /**
658      * Check if there are any waiters and if so notify these.
659      * @return {@code true} if there are any listeners attached to the promise, {@code false} otherwise.
660      */
661     private synchronized boolean checkNotifyWaiters() {
662         if (waiters > 0) {
663             notifyAll();
664         }
665         return listener != null || listeners != null;
666     }
667 
668     private void incWaiters() {
669         if (waiters == Short.MAX_VALUE) {
670             throw new IllegalStateException("too many waiters: " + this);
671         }
672         ++waiters;
673     }
674 
675     private void decWaiters() {
676         --waiters;
677     }
678 
679     private void rethrowIfFailed() {
680         Throwable cause = cause();
681         if (cause == null) {
682             return;
683         }
684 
685         if (!(cause instanceof CancellationException) && ThrowableUtil.getSuppressed(cause).length == 0) {
686             ThrowableUtil.addSuppressed(cause, new RuntimeException("Rethrowing promise failure cause"));
687         }
688         PlatformDependent.throwException(cause);
689     }
690 
691     private boolean await0(long timeoutNanos, boolean interruptable) throws InterruptedException {
692         if (isDone()) {
693             return true;
694         }
695 
696         if (timeoutNanos <= 0) {
697             return isDone();
698         }
699 
700         if (interruptable && Thread.interrupted()) {
701             throw new InterruptedException(toString());
702         }
703 
704         checkDeadLock();
705 
706         // Start counting time from here instead of the first line of this method,
707         // to avoid/postpone performance cost of System.nanoTime().
708         final long startTime = System.nanoTime();
709         synchronized (this) {
710             boolean interrupted = false;
711             try {
712                 long waitTime = timeoutNanos;
713                 while (!isDone() && waitTime > 0) {
714                     incWaiters();
715                     try {
716                         wait(waitTime / 1000000, (int) (waitTime % 1000000));
717                     } catch (InterruptedException e) {
718                         if (interruptable) {
719                             throw e;
720                         } else {
721                             interrupted = true;
722                         }
723                     } finally {
724                         decWaiters();
725                     }
726                     // Check isDone() in advance, try to avoid calculating the elapsed time later.
727                     if (isDone()) {
728                         return true;
729                     }
730                     // Calculate the elapsed time here instead of in the while condition,
731                     // try to avoid performance cost of System.nanoTime() in the first loop of while.
732                     waitTime = timeoutNanos - (System.nanoTime() - startTime);
733                 }
734                 return isDone();
735             } finally {
736                 if (interrupted) {
737                     Thread.currentThread().interrupt();
738                 }
739             }
740         }
741     }
742 
743     /**
744      * Notify all progressive listeners.
745      * <p>
746      * No attempt is made to ensure notification order if multiple calls are made to this method before
747      * the original invocation completes.
748      * <p>
749      * This will do an iteration over all listeners to get all of type {@link GenericProgressiveFutureListener}s.
750      * @param progress the new progress.
751      * @param total the total progress.
752      */
753     @SuppressWarnings("unchecked")
754     void notifyProgressiveListeners(final long progress, final long total) {
755         final Object listeners = progressiveListeners();
756         if (listeners == null) {
757             return;
758         }
759 
760         final ProgressiveFuture<V> self = (ProgressiveFuture<V>) this;
761 
762         EventExecutor executor = executor();
763         if (executor.inEventLoop()) {
764             if (listeners instanceof GenericProgressiveFutureListener[]) {
765                 notifyProgressiveListeners0(
766                         self, (GenericProgressiveFutureListener<?>[]) listeners, progress, total);
767             } else {
768                 notifyProgressiveListener0(
769                         self, (GenericProgressiveFutureListener<ProgressiveFuture<V>>) listeners, progress, total);
770             }
771         } else {
772             if (listeners instanceof GenericProgressiveFutureListener[]) {
773                 final GenericProgressiveFutureListener<?>[] array =
774                         (GenericProgressiveFutureListener<?>[]) listeners;
775                 safeExecute(executor, new Runnable() {
776                     @Override
777                     public void run() {
778                         notifyProgressiveListeners0(self, array, progress, total);
779                     }
780                 });
781             } else {
782                 final GenericProgressiveFutureListener<ProgressiveFuture<V>> l =
783                         (GenericProgressiveFutureListener<ProgressiveFuture<V>>) listeners;
784                 safeExecute(executor, new Runnable() {
785                     @Override
786                     public void run() {
787                         notifyProgressiveListener0(self, l, progress, total);
788                     }
789                 });
790             }
791         }
792     }
793 
794     /**
795      * Returns a {@link GenericProgressiveFutureListener}, an array of {@link GenericProgressiveFutureListener}, or
796      * {@code null}.
797      */
798     private synchronized Object progressiveListeners() {
799         final GenericFutureListener listener = this.listener;
800         final DefaultFutureListeners listeners = this.listeners;
801         if (listener == null && listeners == null) {
802             // No listeners added
803             return null;
804         }
805 
806         if (listeners != null) {
807             // Copy DefaultFutureListeners into an array of listeners.
808             DefaultFutureListeners dfl = listeners;
809             int progressiveSize = dfl.progressiveSize();
810             switch (progressiveSize) {
811                 case 0:
812                     return null;
813                 case 1:
814                     for (GenericFutureListener<?> l: dfl.listeners()) {
815                         if (l instanceof GenericProgressiveFutureListener) {
816                             return l;
817                         }
818                     }
819                     return null;
820             }
821 
822             GenericFutureListener<?>[] array = dfl.listeners();
823             GenericProgressiveFutureListener<?>[] copy = new GenericProgressiveFutureListener[progressiveSize];
824             for (int i = 0, j = 0; j < progressiveSize; i ++) {
825                 GenericFutureListener<?> l = array[i];
826                 if (l instanceof GenericProgressiveFutureListener) {
827                     copy[j ++] = (GenericProgressiveFutureListener<?>) l;
828                 }
829             }
830 
831             return copy;
832         } else if (listener instanceof GenericProgressiveFutureListener) {
833             return listener;
834         } else {
835             // Only one listener was added and it's not a progressive listener.
836             return null;
837         }
838     }
839 
840     private static void notifyProgressiveListeners0(
841             ProgressiveFuture<?> future, GenericProgressiveFutureListener<?>[] listeners, long progress, long total) {
842         for (GenericProgressiveFutureListener<?> l: listeners) {
843             if (l == null) {
844                 break;
845             }
846             notifyProgressiveListener0(future, l, progress, total);
847         }
848     }
849 
850     @SuppressWarnings({ "unchecked", "rawtypes" })
851     private static void notifyProgressiveListener0(
852             ProgressiveFuture future, GenericProgressiveFutureListener l, long progress, long total) {
853         try {
854             l.operationProgressed(future, progress, total);
855         } catch (Throwable t) {
856             if (logger.isWarnEnabled()) {
857                 logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationProgressed()", t);
858             }
859         }
860     }
861 
862     private static boolean isCancelled0(Object result) {
863         return result instanceof CauseHolder && ((CauseHolder) result).cause instanceof CancellationException;
864     }
865 
866     private static boolean isDone0(Object result) {
867         return result != null && result != UNCANCELLABLE;
868     }
869 
870     private static final class CauseHolder {
871         final Throwable cause;
872         CauseHolder(Throwable cause) {
873             this.cause = cause;
874         }
875     }
876 
877     private static void safeExecute(EventExecutor executor, Runnable task) {
878         try {
879             executor.execute(task);
880         } catch (Throwable t) {
881             rejectedExecutionLogger.error("Failed to submit a listener notification task. Event loop shut down?", t);
882         }
883     }
884 
885     private static final class StacklessCancellationException extends CancellationException {
886 
887         private static final long serialVersionUID = -2974906711413716191L;
888 
889         private StacklessCancellationException() { }
890 
891         // Override fillInStackTrace() so we not populate the backtrace via a native call and so leak the
892         // Classloader.
893         @Override
894         public Throwable fillInStackTrace() {
895             return this;
896         }
897 
898         static StacklessCancellationException newInstance(Class<?> clazz, String method) {
899             return ThrowableUtil.unknownStackTrace(new StacklessCancellationException(), clazz, method);
900         }
901     }
902 }