View Javadoc
1   /*
2    * Copyright 2013 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.util.concurrent;
17  
18  import io.netty.util.internal.InternalThreadLocalMap;
19  import io.netty.util.internal.PlatformDependent;
20  import io.netty.util.internal.StringUtil;
21  import io.netty.util.internal.SystemPropertyUtil;
22  import io.netty.util.internal.ThrowableUtil;
23  import io.netty.util.internal.logging.InternalLogger;
24  import io.netty.util.internal.logging.InternalLoggerFactory;
25  
26  import java.util.concurrent.CancellationException;
27  import java.util.concurrent.CompletionException;
28  import java.util.concurrent.ExecutionException;
29  import java.util.concurrent.TimeUnit;
30  import java.util.concurrent.TimeoutException;
31  import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
32  
33  import static io.netty.util.internal.ObjectUtil.checkNotNull;
34  import static java.util.concurrent.TimeUnit.MILLISECONDS;
35  
36  public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
37      /**
38       * System property with integer type value, that determine the max reentrancy/recursion level for when
39       * listener notifications prompt other listeners to be notified.
40       * <p>
41       * When the reentrancy/recursion level becomes greater than this number, a new task will instead be scheduled
42       * on the event loop, to finish notifying any subsequent listners.
43       * <p>
44       * The default value is {@code 8}.
45       */
46      public static final String PROPERTY_MAX_LISTENER_STACK_DEPTH = "io.netty.defaultPromise.maxListenerStackDepth";
47  
48      private static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultPromise.class);
49      private static final InternalLogger rejectedExecutionLogger =
50              InternalLoggerFactory.getInstance(DefaultPromise.class.getName() + ".rejectedExecution");
51      private static final int MAX_LISTENER_STACK_DEPTH = Math.min(8,
52              SystemPropertyUtil.getInt(PROPERTY_MAX_LISTENER_STACK_DEPTH, 8));
53      @SuppressWarnings("rawtypes")
54      private static final AtomicReferenceFieldUpdater<DefaultPromise, Object> RESULT_UPDATER =
55              AtomicReferenceFieldUpdater.newUpdater(DefaultPromise.class, Object.class, "result");
56      private static final Object SUCCESS = new Object();
57      private static final Object UNCANCELLABLE = new Object();
58      private static final CauseHolder CANCELLATION_CAUSE_HOLDER = new CauseHolder(
59              StacklessCancellationException.newInstance(DefaultPromise.class, "cancel(...)"));
60      private static final StackTraceElement[] CANCELLATION_STACK = CANCELLATION_CAUSE_HOLDER.cause.getStackTrace();
61  
62      private volatile Object result;
63      private final EventExecutor executor;
64  
65      /**
66       * One or more listeners. Can be a {@link GenericFutureListener} or a {@link DefaultFutureListeners}.
67       * If {@code null}, it means either 1) no listeners were added yet or 2) all listeners were notified.
68       * <p>
69       * Threading - synchronized(this). We must support adding listeners when there is no EventExecutor.
70       */
71      private GenericFutureListener<? extends Future<?>> listener;
72      private DefaultFutureListeners listeners;
73      /**
74       * Threading - synchronized(this). We are required to hold the monitor to use Java's underlying wait()/notifyAll().
75       */
76      private short waiters;
77  
78      /**
79       * Threading - synchronized(this). We must prevent concurrent notification and FIFO listener notification if the
80       * executor changes.
81       */
82      private boolean notifyingListeners;
83  
84      /**
85       * Creates a new instance.
86       * <p>
87       * It is preferable to use {@link EventExecutor#newPromise()} to create a new promise
88       *
89       * @param executor
90       *        the {@link EventExecutor} which is used to notify the promise once it is complete.
91       *        It is assumed this executor will protect against {@link StackOverflowError} exceptions.
92       *        The executor may be used to avoid {@link StackOverflowError} by executing a {@link Runnable} if the stack
93       *        depth exceeds a threshold.
94       *
95       */
96      public DefaultPromise(EventExecutor executor) {
97          this.executor = checkNotNull(executor, "executor");
98      }
99  
100     /**
101      * See {@link #executor()} for expectations of the executor.
102      */
103     protected DefaultPromise() {
104         // only for subclasses
105         executor = null;
106     }
107 
108     @Override
109     public Promise<V> setSuccess(V result) {
110         if (setSuccess0(result)) {
111             return this;
112         }
113         throw new IllegalStateException("complete already: " + this);
114     }
115 
116     @Override
117     public boolean trySuccess(V result) {
118         return setSuccess0(result);
119     }
120 
121     @Override
122     public Promise<V> setFailure(Throwable cause) {
123         if (setFailure0(cause)) {
124             return this;
125         }
126         throw new IllegalStateException("complete already: " + this, cause);
127     }
128 
129     @Override
130     public boolean tryFailure(Throwable cause) {
131         return setFailure0(cause);
132     }
133 
134     @Override
135     public boolean setUncancellable() {
136         if (RESULT_UPDATER.compareAndSet(this, null, UNCANCELLABLE)) {
137             return true;
138         }
139         Object result = this.result;
140         return !isDone0(result) || !isCancelled0(result);
141     }
142 
143     @Override
144     public boolean isSuccess() {
145         Object result = this.result;
146         return result != null && result != UNCANCELLABLE && !(result instanceof CauseHolder);
147     }
148 
149     @Override
150     public boolean isCancellable() {
151         return result == null;
152     }
153 
154     private static final class LeanCancellationException extends CancellationException {
155         private static final long serialVersionUID = 2794674970981187807L;
156 
157         // Suppress a warning since the method doesn't need synchronization
158         @Override
159         public Throwable fillInStackTrace() {
160             setStackTrace(CANCELLATION_STACK);
161             return this;
162         }
163 
164         @Override
165         public String toString() {
166             return CancellationException.class.getName();
167         }
168     }
169 
170     @Override
171     public Throwable cause() {
172         return cause0(result);
173     }
174 
175     private Throwable cause0(Object result) {
176         if (!(result instanceof CauseHolder)) {
177             return null;
178         }
179         if (result == CANCELLATION_CAUSE_HOLDER) {
180             CancellationException ce = new LeanCancellationException();
181             if (RESULT_UPDATER.compareAndSet(this, CANCELLATION_CAUSE_HOLDER, new CauseHolder(ce))) {
182                 return ce;
183             }
184             result = this.result;
185         }
186         return ((CauseHolder) result).cause;
187     }
188 
189     @Override
190     public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
191         checkNotNull(listener, "listener");
192 
193         synchronized (this) {
194             addListener0(listener);
195         }
196 
197         if (isDone()) {
198             notifyListeners();
199         }
200 
201         return this;
202     }
203 
204     @Override
205     public Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners) {
206         checkNotNull(listeners, "listeners");
207 
208         synchronized (this) {
209             for (GenericFutureListener<? extends Future<? super V>> listener : listeners) {
210                 if (listener == null) {
211                     break;
212                 }
213                 addListener0(listener);
214             }
215         }
216 
217         if (isDone()) {
218             notifyListeners();
219         }
220 
221         return this;
222     }
223 
224     @Override
225     public Promise<V> removeListener(final GenericFutureListener<? extends Future<? super V>> listener) {
226         checkNotNull(listener, "listener");
227 
228         synchronized (this) {
229             removeListener0(listener);
230         }
231 
232         return this;
233     }
234 
235     @Override
236     public Promise<V> removeListeners(final GenericFutureListener<? extends Future<? super V>>... listeners) {
237         checkNotNull(listeners, "listeners");
238 
239         synchronized (this) {
240             for (GenericFutureListener<? extends Future<? super V>> listener : listeners) {
241                 if (listener == null) {
242                     break;
243                 }
244                 removeListener0(listener);
245             }
246         }
247 
248         return this;
249     }
250 
251     @Override
252     public Promise<V> await() throws InterruptedException {
253         if (isDone()) {
254             return this;
255         }
256 
257         if (Thread.interrupted()) {
258             throw new InterruptedException(toString());
259         }
260 
261         checkDeadLock();
262 
263         synchronized (this) {
264             while (!isDone()) {
265                 incWaiters();
266                 try {
267                     wait();
268                 } finally {
269                     decWaiters();
270                 }
271             }
272         }
273         return this;
274     }
275 
276     @Override
277     public Promise<V> awaitUninterruptibly() {
278         if (isDone()) {
279             return this;
280         }
281 
282         checkDeadLock();
283 
284         boolean interrupted = false;
285         synchronized (this) {
286             while (!isDone()) {
287                 incWaiters();
288                 try {
289                     wait();
290                 } catch (InterruptedException e) {
291                     // Interrupted while waiting.
292                     interrupted = true;
293                 } finally {
294                     decWaiters();
295                 }
296             }
297         }
298 
299         if (interrupted) {
300             Thread.currentThread().interrupt();
301         }
302 
303         return this;
304     }
305 
306     @Override
307     public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
308         return await0(unit.toNanos(timeout), true);
309     }
310 
311     @Override
312     public boolean await(long timeoutMillis) throws InterruptedException {
313         return await0(MILLISECONDS.toNanos(timeoutMillis), true);
314     }
315 
316     @Override
317     public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {
318         try {
319             return await0(unit.toNanos(timeout), false);
320         } catch (InterruptedException e) {
321             // Should not be raised at all.
322             throw new InternalError();
323         }
324     }
325 
326     @Override
327     public boolean awaitUninterruptibly(long timeoutMillis) {
328         try {
329             return await0(MILLISECONDS.toNanos(timeoutMillis), false);
330         } catch (InterruptedException e) {
331             // Should not be raised at all.
332             throw new InternalError();
333         }
334     }
335 
336     @SuppressWarnings("unchecked")
337     @Override
338     public V getNow() {
339         Object result = this.result;
340         if (result instanceof CauseHolder || result == SUCCESS || result == UNCANCELLABLE) {
341             return null;
342         }
343         return (V) result;
344     }
345 
346     @SuppressWarnings("unchecked")
347     @Override
348     public V get() throws InterruptedException, ExecutionException {
349         Object result = this.result;
350         if (!isDone0(result)) {
351             await();
352             result = this.result;
353         }
354         if (result == SUCCESS || result == UNCANCELLABLE) {
355             return null;
356         }
357         Throwable cause = cause0(result);
358         if (cause == null) {
359             return (V) result;
360         }
361         if (cause instanceof CancellationException) {
362             throw (CancellationException) cause;
363         }
364         throw new ExecutionException(cause);
365     }
366 
367     @SuppressWarnings("unchecked")
368     @Override
369     public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
370         Object result = this.result;
371         if (!isDone0(result)) {
372             if (!await(timeout, unit)) {
373                 throw new TimeoutException();
374             }
375             result = this.result;
376         }
377         if (result == SUCCESS || result == UNCANCELLABLE) {
378             return null;
379         }
380         Throwable cause = cause0(result);
381         if (cause == null) {
382             return (V) result;
383         }
384         if (cause instanceof CancellationException) {
385             throw (CancellationException) cause;
386         }
387         throw new ExecutionException(cause);
388     }
389 
390     /**
391      * {@inheritDoc}
392      *
393      * @param mayInterruptIfRunning this value has no effect in this implementation.
394      */
395     @Override
396     public boolean cancel(boolean mayInterruptIfRunning) {
397         if (RESULT_UPDATER.compareAndSet(this, null, CANCELLATION_CAUSE_HOLDER)) {
398             if (checkNotifyWaiters()) {
399                 notifyListeners();
400             }
401             return true;
402         }
403         return false;
404     }
405 
406     @Override
407     public boolean isCancelled() {
408         return isCancelled0(result);
409     }
410 
411     @Override
412     public boolean isDone() {
413         return isDone0(result);
414     }
415 
416     @Override
417     public Promise<V> sync() throws InterruptedException {
418         await();
419         rethrowIfFailed();
420         return this;
421     }
422 
423     @Override
424     public Promise<V> syncUninterruptibly() {
425         awaitUninterruptibly();
426         rethrowIfFailed();
427         return this;
428     }
429 
430     @Override
431     public String toString() {
432         return toStringBuilder().toString();
433     }
434 
435     protected StringBuilder toStringBuilder() {
436         StringBuilder buf = new StringBuilder(64)
437                 .append(StringUtil.simpleClassName(this))
438                 .append('@')
439                 .append(Integer.toHexString(hashCode()));
440 
441         Object result = this.result;
442         if (result == SUCCESS) {
443             buf.append("(success)");
444         } else if (result == UNCANCELLABLE) {
445             buf.append("(uncancellable)");
446         } else if (result instanceof CauseHolder) {
447             buf.append("(failure: ")
448                     .append(((CauseHolder) result).cause)
449                     .append(')');
450         } else if (result != null) {
451             buf.append("(success: ")
452                     .append(result)
453                     .append(')');
454         } else {
455             buf.append("(incomplete)");
456         }
457 
458         return buf;
459     }
460 
461     /**
462      * Get the executor used to notify listeners when this promise is complete.
463      * <p>
464      * It is assumed this executor will protect against {@link StackOverflowError} exceptions.
465      * The executor may be used to avoid {@link StackOverflowError} by executing a {@link Runnable} if the stack
466      * depth exceeds a threshold.
467      * @return The executor used to notify listeners when this promise is complete.
468      */
469     protected EventExecutor executor() {
470         return executor;
471     }
472 
473     protected void checkDeadLock() {
474         EventExecutor e = executor();
475         if (e != null && e.inEventLoop()) {
476             throw new BlockingOperationException(toString());
477         }
478     }
479 
480     /**
481      * Notify a listener that a future has completed.
482      * <p>
483      * This method has a fixed depth of {@link #MAX_LISTENER_STACK_DEPTH} that will limit recursion to prevent
484      * {@link StackOverflowError} and will stop notifying listeners added after this threshold is exceeded.
485      * @param eventExecutor the executor to use to notify the listener {@code listener}.
486      * @param future the future that is complete.
487      * @param listener the listener to notify.
488      */
489     protected static void notifyListener(
490             EventExecutor eventExecutor, final Future<?> future, final GenericFutureListener<?> listener) {
491         notifyListenerWithStackOverFlowProtection(
492                 checkNotNull(eventExecutor, "eventExecutor"),
493                 checkNotNull(future, "future"),
494                 checkNotNull(listener, "listener"));
495     }
496 
497     private void notifyListeners() {
498         EventExecutor executor = executor();
499         if (executor.inEventLoop()) {
500             final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
501             final int stackDepth = threadLocals.futureListenerStackDepth();
502             if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
503                 threadLocals.setFutureListenerStackDepth(stackDepth + 1);
504                 try {
505                     notifyListenersNow();
506                 } finally {
507                     threadLocals.setFutureListenerStackDepth(stackDepth);
508                 }
509                 return;
510             }
511         }
512 
513         safeExecute(executor, new Runnable() {
514             @Override
515             public void run() {
516                 notifyListenersNow();
517             }
518         });
519     }
520 
521     /**
522      * The logic in this method should be identical to {@link #notifyListeners()} but
523      * cannot share code because the listener(s) cannot be cached for an instance of {@link DefaultPromise} since the
524      * listener(s) may be changed and is protected by a synchronized operation.
525      */
526     private static void notifyListenerWithStackOverFlowProtection(final EventExecutor executor,
527                                                                   final Future<?> future,
528                                                                   final GenericFutureListener<?> listener) {
529         if (executor.inEventLoop()) {
530             final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
531             final int stackDepth = threadLocals.futureListenerStackDepth();
532             if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
533                 threadLocals.setFutureListenerStackDepth(stackDepth + 1);
534                 try {
535                     notifyListener0(future, listener);
536                 } finally {
537                     threadLocals.setFutureListenerStackDepth(stackDepth);
538                 }
539                 return;
540             }
541         }
542 
543         safeExecute(executor, new Runnable() {
544             @Override
545             public void run() {
546                 notifyListener0(future, listener);
547             }
548         });
549     }
550 
551     private void notifyListenersNow() {
552         GenericFutureListener listener;
553         DefaultFutureListeners listeners;
554         synchronized (this) {
555             listener = this.listener;
556             listeners = this.listeners;
557             // Only proceed if there are listeners to notify and we are not already notifying listeners.
558             if (notifyingListeners || (listener == null && listeners == null)) {
559                 return;
560             }
561             notifyingListeners = true;
562             if (listener != null) {
563                 this.listener = null;
564             } else {
565                 this.listeners = null;
566             }
567         }
568         for (;;) {
569             if (listener != null) {
570                 notifyListener0(this, listener);
571             } else {
572                 notifyListeners0(listeners);
573             }
574             synchronized (this) {
575                 if (this.listener == null && this.listeners == null) {
576                     // Nothing can throw from within this method, so setting notifyingListeners back to false does not
577                     // need to be in a finally block.
578                     notifyingListeners = false;
579                     return;
580                 }
581                 listener = this.listener;
582                 listeners = this.listeners;
583                 if (listener != null) {
584                     this.listener = null;
585                 } else {
586                     this.listeners = null;
587                 }
588             }
589         }
590     }
591 
592     private void notifyListeners0(DefaultFutureListeners listeners) {
593         GenericFutureListener<?>[] a = listeners.listeners();
594         int size = listeners.size();
595         for (int i = 0; i < size; i ++) {
596             notifyListener0(this, a[i]);
597         }
598     }
599 
600     @SuppressWarnings({ "unchecked", "rawtypes" })
601     private static void notifyListener0(Future future, GenericFutureListener l) {
602         try {
603             l.operationComplete(future);
604         } catch (Throwable t) {
605             if (logger.isWarnEnabled()) {
606                 logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t);
607             }
608         }
609     }
610 
611     private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) {
612         if (this.listener == null) {
613             if (listeners == null) {
614                 this.listener = listener;
615             } else {
616                 listeners.add(listener);
617             }
618         } else {
619             assert listeners == null;
620             listeners = new DefaultFutureListeners(this.listener, listener);
621             this.listener = null;
622         }
623     }
624 
625     private void removeListener0(GenericFutureListener<? extends Future<? super V>> toRemove) {
626         if (listener == toRemove) {
627             listener = null;
628         } else if (listeners != null) {
629             listeners.remove(toRemove);
630             // Removal is rare, no need for compaction
631             if (listeners.size() == 0) {
632                 listeners = null;
633             }
634         }
635     }
636 
637     private boolean setSuccess0(V result) {
638         return setValue0(result == null ? SUCCESS : result);
639     }
640 
641     private boolean setFailure0(Throwable cause) {
642         return setValue0(new CauseHolder(checkNotNull(cause, "cause")));
643     }
644 
645     private boolean setValue0(Object objResult) {
646         if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
647             RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
648             if (checkNotifyWaiters()) {
649                 notifyListeners();
650             }
651             return true;
652         }
653         return false;
654     }
655 
656     /**
657      * Check if there are any waiters and if so notify these.
658      * @return {@code true} if there are any listeners attached to the promise, {@code false} otherwise.
659      */
660     private synchronized boolean checkNotifyWaiters() {
661         if (waiters > 0) {
662             notifyAll();
663         }
664         return listener != null || listeners != null;
665     }
666 
667     private void incWaiters() {
668         if (waiters == Short.MAX_VALUE) {
669             throw new IllegalStateException("too many waiters: " + this);
670         }
671         ++waiters;
672     }
673 
674     private void decWaiters() {
675         --waiters;
676     }
677 
678     private void rethrowIfFailed() {
679         Throwable cause = cause();
680         if (cause == null) {
681             return;
682         }
683 
684         if (!(cause instanceof CancellationException) && cause.getSuppressed().length == 0) {
685             cause.addSuppressed(new CompletionException("Rethrowing promise failure cause", null));
686         }
687         PlatformDependent.throwException(cause);
688     }
689 
690     private boolean await0(long timeoutNanos, boolean interruptable) throws InterruptedException {
691         if (isDone()) {
692             return true;
693         }
694 
695         if (timeoutNanos <= 0) {
696             return isDone();
697         }
698 
699         if (interruptable && Thread.interrupted()) {
700             throw new InterruptedException(toString());
701         }
702 
703         checkDeadLock();
704 
705         // Start counting time from here instead of the first line of this method,
706         // to avoid/postpone performance cost of System.nanoTime().
707         final long startTime = System.nanoTime();
708         synchronized (this) {
709             boolean interrupted = false;
710             try {
711                 long waitTime = timeoutNanos;
712                 while (!isDone() && waitTime > 0) {
713                     incWaiters();
714                     try {
715                         wait(waitTime / 1000000, (int) (waitTime % 1000000));
716                     } catch (InterruptedException e) {
717                         if (interruptable) {
718                             throw e;
719                         } else {
720                             interrupted = true;
721                         }
722                     } finally {
723                         decWaiters();
724                     }
725                     // Check isDone() in advance, try to avoid calculating the elapsed time later.
726                     if (isDone()) {
727                         return true;
728                     }
729                     // Calculate the elapsed time here instead of in the while condition,
730                     // try to avoid performance cost of System.nanoTime() in the first loop of while.
731                     waitTime = timeoutNanos - (System.nanoTime() - startTime);
732                 }
733                 return isDone();
734             } finally {
735                 if (interrupted) {
736                     Thread.currentThread().interrupt();
737                 }
738             }
739         }
740     }
741 
742     /**
743      * Notify all progressive listeners.
744      * <p>
745      * No attempt is made to ensure notification order if multiple calls are made to this method before
746      * the original invocation completes.
747      * <p>
748      * This will do an iteration over all listeners to get all of type {@link GenericProgressiveFutureListener}s.
749      * @param progress the new progress.
750      * @param total the total progress.
751      */
752     @SuppressWarnings("unchecked")
753     void notifyProgressiveListeners(final long progress, final long total) {
754         final Object listeners = progressiveListeners();
755         if (listeners == null) {
756             return;
757         }
758 
759         final ProgressiveFuture<V> self = (ProgressiveFuture<V>) this;
760 
761         EventExecutor executor = executor();
762         if (executor.inEventLoop()) {
763             if (listeners instanceof GenericProgressiveFutureListener[]) {
764                 notifyProgressiveListeners0(
765                         self, (GenericProgressiveFutureListener<?>[]) listeners, progress, total);
766             } else {
767                 notifyProgressiveListener0(
768                         self, (GenericProgressiveFutureListener<ProgressiveFuture<V>>) listeners, progress, total);
769             }
770         } else {
771             if (listeners instanceof GenericProgressiveFutureListener[]) {
772                 final GenericProgressiveFutureListener<?>[] array =
773                         (GenericProgressiveFutureListener<?>[]) listeners;
774                 safeExecute(executor, new Runnable() {
775                     @Override
776                     public void run() {
777                         notifyProgressiveListeners0(self, array, progress, total);
778                     }
779                 });
780             } else {
781                 final GenericProgressiveFutureListener<ProgressiveFuture<V>> l =
782                         (GenericProgressiveFutureListener<ProgressiveFuture<V>>) listeners;
783                 safeExecute(executor, new Runnable() {
784                     @Override
785                     public void run() {
786                         notifyProgressiveListener0(self, l, progress, total);
787                     }
788                 });
789             }
790         }
791     }
792 
793     /**
794      * Returns a {@link GenericProgressiveFutureListener}, an array of {@link GenericProgressiveFutureListener}, or
795      * {@code null}.
796      */
797     private synchronized Object progressiveListeners() {
798         final GenericFutureListener listener = this.listener;
799         final DefaultFutureListeners listeners = this.listeners;
800         if (listener == null && listeners == null) {
801             // No listeners added
802             return null;
803         }
804 
805         if (listeners != null) {
806             // Copy DefaultFutureListeners into an array of listeners.
807             DefaultFutureListeners dfl = listeners;
808             int progressiveSize = dfl.progressiveSize();
809             switch (progressiveSize) {
810                 case 0:
811                     return null;
812                 case 1:
813                     for (GenericFutureListener<?> l: dfl.listeners()) {
814                         if (l instanceof GenericProgressiveFutureListener) {
815                             return l;
816                         }
817                     }
818                     return null;
819             }
820 
821             GenericFutureListener<?>[] array = dfl.listeners();
822             GenericProgressiveFutureListener<?>[] copy = new GenericProgressiveFutureListener[progressiveSize];
823             for (int i = 0, j = 0; j < progressiveSize; i ++) {
824                 GenericFutureListener<?> l = array[i];
825                 if (l instanceof GenericProgressiveFutureListener) {
826                     copy[j ++] = (GenericProgressiveFutureListener<?>) l;
827                 }
828             }
829 
830             return copy;
831         } else if (listener instanceof GenericProgressiveFutureListener) {
832             return listener;
833         } else {
834             // Only one listener was added and it's not a progressive listener.
835             return null;
836         }
837     }
838 
839     private static void notifyProgressiveListeners0(
840             ProgressiveFuture<?> future, GenericProgressiveFutureListener<?>[] listeners, long progress, long total) {
841         for (GenericProgressiveFutureListener<?> l: listeners) {
842             if (l == null) {
843                 break;
844             }
845             notifyProgressiveListener0(future, l, progress, total);
846         }
847     }
848 
849     @SuppressWarnings({ "unchecked", "rawtypes" })
850     private static void notifyProgressiveListener0(
851             ProgressiveFuture future, GenericProgressiveFutureListener l, long progress, long total) {
852         try {
853             l.operationProgressed(future, progress, total);
854         } catch (Throwable t) {
855             if (logger.isWarnEnabled()) {
856                 logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationProgressed()", t);
857             }
858         }
859     }
860 
861     private static boolean isCancelled0(Object result) {
862         return result instanceof CauseHolder && ((CauseHolder) result).cause instanceof CancellationException;
863     }
864 
865     private static boolean isDone0(Object result) {
866         return result != null && result != UNCANCELLABLE;
867     }
868 
869     private static final class CauseHolder {
870         final Throwable cause;
871         CauseHolder(Throwable cause) {
872             this.cause = cause;
873         }
874     }
875 
876     private static void safeExecute(EventExecutor executor, Runnable task) {
877         try {
878             executor.execute(task);
879         } catch (Throwable t) {
880             rejectedExecutionLogger.error("Failed to submit a listener notification task. Event loop shut down?", t);
881         }
882     }
883 
884     private static final class StacklessCancellationException extends CancellationException {
885 
886         private static final long serialVersionUID = -2974906711413716191L;
887 
888         private StacklessCancellationException() { }
889 
890         // Override fillInStackTrace() so we not populate the backtrace via a native call and so leak the
891         // Classloader.
892         @Override
893         public Throwable fillInStackTrace() {
894             return this;
895         }
896 
897         static StacklessCancellationException newInstance(Class<?> clazz, String method) {
898             return ThrowableUtil.unknownStackTrace(new StacklessCancellationException(), clazz, method);
899         }
900     }
901 }