View Javadoc
1   /*
2    * Copyright 2013 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.util.concurrent;
17  
18  import io.netty.util.internal.InternalThreadLocalMap;
19  import io.netty.util.internal.PlatformDependent;
20  import io.netty.util.internal.StringUtil;
21  import io.netty.util.internal.SystemPropertyUtil;
22  import io.netty.util.internal.ThrowableUtil;
23  import io.netty.util.internal.logging.InternalLogger;
24  import io.netty.util.internal.logging.InternalLoggerFactory;
25  
26  import java.util.concurrent.CancellationException;
27  import java.util.concurrent.ExecutionException;
28  import java.util.concurrent.TimeUnit;
29  import java.util.concurrent.TimeoutException;
30  import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
31  
32  import static io.netty.util.internal.ObjectUtil.checkNotNull;
33  import static java.util.concurrent.TimeUnit.MILLISECONDS;
34  
35  public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
36      private static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultPromise.class);
37      private static final InternalLogger rejectedExecutionLogger =
38              InternalLoggerFactory.getInstance(DefaultPromise.class.getName() + ".rejectedExecution");
39      private static final int MAX_LISTENER_STACK_DEPTH = Math.min(8,
40              SystemPropertyUtil.getInt("io.netty.defaultPromise.maxListenerStackDepth", 8));
41      @SuppressWarnings("rawtypes")
42      private static final AtomicReferenceFieldUpdater<DefaultPromise, Object> RESULT_UPDATER =
43              AtomicReferenceFieldUpdater.newUpdater(DefaultPromise.class, Object.class, "result");
44      private static final Object SUCCESS = new Object();
45      private static final Object UNCANCELLABLE = new Object();
46      private static final CauseHolder CANCELLATION_CAUSE_HOLDER = new CauseHolder(
47              StacklessCancellationException.newInstance(DefaultPromise.class, "cancel(...)"));
48      private static final StackTraceElement[] CANCELLATION_STACK = CANCELLATION_CAUSE_HOLDER.cause.getStackTrace();
49  
50      private volatile Object result;
51      private final EventExecutor executor;
52      /**
53       * One or more listeners. Can be a {@link GenericFutureListener} or a {@link DefaultFutureListeners}.
54       * If {@code null}, it means either 1) no listeners were added yet or 2) all listeners were notified.
55       *
56       * Threading - synchronized(this). We must support adding listeners when there is no EventExecutor.
57       */
58      private Object listeners;
59      /**
60       * Threading - synchronized(this). We are required to hold the monitor to use Java's underlying wait()/notifyAll().
61       */
62      private short waiters;
63  
64      /**
65       * Threading - synchronized(this). We must prevent concurrent notification and FIFO listener notification if the
66       * executor changes.
67       */
68      private boolean notifyingListeners;
69  
70      /**
71       * Creates a new instance.
72       *
73       * It is preferable to use {@link EventExecutor#newPromise()} to create a new promise
74       *
75       * @param executor
76       *        the {@link EventExecutor} which is used to notify the promise once it is complete.
77       *        It is assumed this executor will protect against {@link StackOverflowError} exceptions.
78       *        The executor may be used to avoid {@link StackOverflowError} by executing a {@link Runnable} if the stack
79       *        depth exceeds a threshold.
80       *
81       */
82      public DefaultPromise(EventExecutor executor) {
83          this.executor = checkNotNull(executor, "executor");
84      }
85  
86      /**
87       * See {@link #executor()} for expectations of the executor.
88       */
89      protected DefaultPromise() {
90          // only for subclasses
91          executor = null;
92      }
93  
94      @Override
95      public Promise<V> setSuccess(V result) {
96          if (setSuccess0(result)) {
97              return this;
98          }
99          throw new IllegalStateException("complete already: " + this);
100     }
101 
102     @Override
103     public boolean trySuccess(V result) {
104         return setSuccess0(result);
105     }
106 
107     @Override
108     public Promise<V> setFailure(Throwable cause) {
109         if (setFailure0(cause)) {
110             return this;
111         }
112         throw new IllegalStateException("complete already: " + this, cause);
113     }
114 
115     @Override
116     public boolean tryFailure(Throwable cause) {
117         return setFailure0(cause);
118     }
119 
120     @Override
121     public boolean setUncancellable() {
122         if (RESULT_UPDATER.compareAndSet(this, null, UNCANCELLABLE)) {
123             return true;
124         }
125         Object result = this.result;
126         return !isDone0(result) || !isCancelled0(result);
127     }
128 
129     @Override
130     public boolean isSuccess() {
131         Object result = this.result;
132         return result != null && result != UNCANCELLABLE && !(result instanceof CauseHolder);
133     }
134 
135     @Override
136     public boolean isCancellable() {
137         return result == null;
138     }
139 
140     private static final class LeanCancellationException extends CancellationException {
141         private static final long serialVersionUID = 2794674970981187807L;
142 
143         // Suppress a warning since the method doesn't need synchronization
144         @Override
145         public Throwable fillInStackTrace() {   // lgtm[java/non-sync-override]
146             setStackTrace(CANCELLATION_STACK);
147             return this;
148         }
149 
150         @Override
151         public String toString() {
152             return CancellationException.class.getName();
153         }
154     }
155 
156     @Override
157     public Throwable cause() {
158         return cause0(result);
159     }
160 
161     private Throwable cause0(Object result) {
162         if (!(result instanceof CauseHolder)) {
163             return null;
164         }
165         if (result == CANCELLATION_CAUSE_HOLDER) {
166             CancellationException ce = new LeanCancellationException();
167             if (RESULT_UPDATER.compareAndSet(this, CANCELLATION_CAUSE_HOLDER, new CauseHolder(ce))) {
168                 return ce;
169             }
170             result = this.result;
171         }
172         return ((CauseHolder) result).cause;
173     }
174 
175     @Override
176     public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
177         checkNotNull(listener, "listener");
178 
179         synchronized (this) {
180             addListener0(listener);
181         }
182 
183         if (isDone()) {
184             notifyListeners();
185         }
186 
187         return this;
188     }
189 
190     @Override
191     public Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners) {
192         checkNotNull(listeners, "listeners");
193 
194         synchronized (this) {
195             for (GenericFutureListener<? extends Future<? super V>> listener : listeners) {
196                 if (listener == null) {
197                     break;
198                 }
199                 addListener0(listener);
200             }
201         }
202 
203         if (isDone()) {
204             notifyListeners();
205         }
206 
207         return this;
208     }
209 
210     @Override
211     public Promise<V> removeListener(final GenericFutureListener<? extends Future<? super V>> listener) {
212         checkNotNull(listener, "listener");
213 
214         synchronized (this) {
215             removeListener0(listener);
216         }
217 
218         return this;
219     }
220 
221     @Override
222     public Promise<V> removeListeners(final GenericFutureListener<? extends Future<? super V>>... listeners) {
223         checkNotNull(listeners, "listeners");
224 
225         synchronized (this) {
226             for (GenericFutureListener<? extends Future<? super V>> listener : listeners) {
227                 if (listener == null) {
228                     break;
229                 }
230                 removeListener0(listener);
231             }
232         }
233 
234         return this;
235     }
236 
237     @Override
238     public Promise<V> await() throws InterruptedException {
239         if (isDone()) {
240             return this;
241         }
242 
243         if (Thread.interrupted()) {
244             throw new InterruptedException(toString());
245         }
246 
247         checkDeadLock();
248 
249         synchronized (this) {
250             while (!isDone()) {
251                 incWaiters();
252                 try {
253                     wait();
254                 } finally {
255                     decWaiters();
256                 }
257             }
258         }
259         return this;
260     }
261 
262     @Override
263     public Promise<V> awaitUninterruptibly() {
264         if (isDone()) {
265             return this;
266         }
267 
268         checkDeadLock();
269 
270         boolean interrupted = false;
271         synchronized (this) {
272             while (!isDone()) {
273                 incWaiters();
274                 try {
275                     wait();
276                 } catch (InterruptedException e) {
277                     // Interrupted while waiting.
278                     interrupted = true;
279                 } finally {
280                     decWaiters();
281                 }
282             }
283         }
284 
285         if (interrupted) {
286             Thread.currentThread().interrupt();
287         }
288 
289         return this;
290     }
291 
292     @Override
293     public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
294         return await0(unit.toNanos(timeout), true);
295     }
296 
297     @Override
298     public boolean await(long timeoutMillis) throws InterruptedException {
299         return await0(MILLISECONDS.toNanos(timeoutMillis), true);
300     }
301 
302     @Override
303     public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {
304         try {
305             return await0(unit.toNanos(timeout), false);
306         } catch (InterruptedException e) {
307             // Should not be raised at all.
308             throw new InternalError();
309         }
310     }
311 
312     @Override
313     public boolean awaitUninterruptibly(long timeoutMillis) {
314         try {
315             return await0(MILLISECONDS.toNanos(timeoutMillis), false);
316         } catch (InterruptedException e) {
317             // Should not be raised at all.
318             throw new InternalError();
319         }
320     }
321 
322     @SuppressWarnings("unchecked")
323     @Override
324     public V getNow() {
325         Object result = this.result;
326         if (result instanceof CauseHolder || result == SUCCESS || result == UNCANCELLABLE) {
327             return null;
328         }
329         return (V) result;
330     }
331 
332     @SuppressWarnings("unchecked")
333     @Override
334     public V get() throws InterruptedException, ExecutionException {
335         Object result = this.result;
336         if (!isDone0(result)) {
337             await();
338             result = this.result;
339         }
340         if (result == SUCCESS || result == UNCANCELLABLE) {
341             return null;
342         }
343         Throwable cause = cause0(result);
344         if (cause == null) {
345             return (V) result;
346         }
347         if (cause instanceof CancellationException) {
348             throw (CancellationException) cause;
349         }
350         throw new ExecutionException(cause);
351     }
352 
353     @SuppressWarnings("unchecked")
354     @Override
355     public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
356         Object result = this.result;
357         if (!isDone0(result)) {
358             if (!await(timeout, unit)) {
359                 throw new TimeoutException();
360             }
361             result = this.result;
362         }
363         if (result == SUCCESS || result == UNCANCELLABLE) {
364             return null;
365         }
366         Throwable cause = cause0(result);
367         if (cause == null) {
368             return (V) result;
369         }
370         if (cause instanceof CancellationException) {
371             throw (CancellationException) cause;
372         }
373         throw new ExecutionException(cause);
374     }
375 
376     /**
377      * {@inheritDoc}
378      *
379      * @param mayInterruptIfRunning this value has no effect in this implementation.
380      */
381     @Override
382     public boolean cancel(boolean mayInterruptIfRunning) {
383         if (RESULT_UPDATER.compareAndSet(this, null, CANCELLATION_CAUSE_HOLDER)) {
384             if (checkNotifyWaiters()) {
385                 notifyListeners();
386             }
387             return true;
388         }
389         return false;
390     }
391 
392     @Override
393     public boolean isCancelled() {
394         return isCancelled0(result);
395     }
396 
397     @Override
398     public boolean isDone() {
399         return isDone0(result);
400     }
401 
402     @Override
403     public Promise<V> sync() throws InterruptedException {
404         await();
405         rethrowIfFailed();
406         return this;
407     }
408 
409     @Override
410     public Promise<V> syncUninterruptibly() {
411         awaitUninterruptibly();
412         rethrowIfFailed();
413         return this;
414     }
415 
416     @Override
417     public String toString() {
418         return toStringBuilder().toString();
419     }
420 
421     protected StringBuilder toStringBuilder() {
422         StringBuilder buf = new StringBuilder(64)
423                 .append(StringUtil.simpleClassName(this))
424                 .append('@')
425                 .append(Integer.toHexString(hashCode()));
426 
427         Object result = this.result;
428         if (result == SUCCESS) {
429             buf.append("(success)");
430         } else if (result == UNCANCELLABLE) {
431             buf.append("(uncancellable)");
432         } else if (result instanceof CauseHolder) {
433             buf.append("(failure: ")
434                     .append(((CauseHolder) result).cause)
435                     .append(')');
436         } else if (result != null) {
437             buf.append("(success: ")
438                     .append(result)
439                     .append(')');
440         } else {
441             buf.append("(incomplete)");
442         }
443 
444         return buf;
445     }
446 
447     /**
448      * Get the executor used to notify listeners when this promise is complete.
449      * <p>
450      * It is assumed this executor will protect against {@link StackOverflowError} exceptions.
451      * The executor may be used to avoid {@link StackOverflowError} by executing a {@link Runnable} if the stack
452      * depth exceeds a threshold.
453      * @return The executor used to notify listeners when this promise is complete.
454      */
455     protected EventExecutor executor() {
456         return executor;
457     }
458 
459     protected void checkDeadLock() {
460         EventExecutor e = executor();
461         if (e != null && e.inEventLoop()) {
462             throw new BlockingOperationException(toString());
463         }
464     }
465 
466     /**
467      * Notify a listener that a future has completed.
468      * <p>
469      * This method has a fixed depth of {@link #MAX_LISTENER_STACK_DEPTH} that will limit recursion to prevent
470      * {@link StackOverflowError} and will stop notifying listeners added after this threshold is exceeded.
471      * @param eventExecutor the executor to use to notify the listener {@code listener}.
472      * @param future the future that is complete.
473      * @param listener the listener to notify.
474      */
475     protected static void notifyListener(
476             EventExecutor eventExecutor, final Future<?> future, final GenericFutureListener<?> listener) {
477         notifyListenerWithStackOverFlowProtection(
478                 checkNotNull(eventExecutor, "eventExecutor"),
479                 checkNotNull(future, "future"),
480                 checkNotNull(listener, "listener"));
481     }
482 
483     private void notifyListeners() {
484         EventExecutor executor = executor();
485         if (executor.inEventLoop()) {
486             final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
487             final int stackDepth = threadLocals.futureListenerStackDepth();
488             if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
489                 threadLocals.setFutureListenerStackDepth(stackDepth + 1);
490                 try {
491                     notifyListenersNow();
492                 } finally {
493                     threadLocals.setFutureListenerStackDepth(stackDepth);
494                 }
495                 return;
496             }
497         }
498 
499         safeExecute(executor, new Runnable() {
500             @Override
501             public void run() {
502                 notifyListenersNow();
503             }
504         });
505     }
506 
507     /**
508      * The logic in this method should be identical to {@link #notifyListeners()} but
509      * cannot share code because the listener(s) cannot be cached for an instance of {@link DefaultPromise} since the
510      * listener(s) may be changed and is protected by a synchronized operation.
511      */
512     private static void notifyListenerWithStackOverFlowProtection(final EventExecutor executor,
513                                                                   final Future<?> future,
514                                                                   final GenericFutureListener<?> listener) {
515         if (executor.inEventLoop()) {
516             final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
517             final int stackDepth = threadLocals.futureListenerStackDepth();
518             if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
519                 threadLocals.setFutureListenerStackDepth(stackDepth + 1);
520                 try {
521                     notifyListener0(future, listener);
522                 } finally {
523                     threadLocals.setFutureListenerStackDepth(stackDepth);
524                 }
525                 return;
526             }
527         }
528 
529         safeExecute(executor, new Runnable() {
530             @Override
531             public void run() {
532                 notifyListener0(future, listener);
533             }
534         });
535     }
536 
537     private void notifyListenersNow() {
538         Object listeners;
539         synchronized (this) {
540             // Only proceed if there are listeners to notify and we are not already notifying listeners.
541             if (notifyingListeners || this.listeners == null) {
542                 return;
543             }
544             notifyingListeners = true;
545             listeners = this.listeners;
546             this.listeners = null;
547         }
548         for (;;) {
549             if (listeners instanceof DefaultFutureListeners) {
550                 notifyListeners0((DefaultFutureListeners) listeners);
551             } else {
552                 notifyListener0(this, (GenericFutureListener<?>) listeners);
553             }
554             synchronized (this) {
555                 if (this.listeners == null) {
556                     // Nothing can throw from within this method, so setting notifyingListeners back to false does not
557                     // need to be in a finally block.
558                     notifyingListeners = false;
559                     return;
560                 }
561                 listeners = this.listeners;
562                 this.listeners = null;
563             }
564         }
565     }
566 
567     private void notifyListeners0(DefaultFutureListeners listeners) {
568         GenericFutureListener<?>[] a = listeners.listeners();
569         int size = listeners.size();
570         for (int i = 0; i < size; i ++) {
571             notifyListener0(this, a[i]);
572         }
573     }
574 
575     @SuppressWarnings({ "unchecked", "rawtypes" })
576     private static void notifyListener0(Future future, GenericFutureListener l) {
577         try {
578             l.operationComplete(future);
579         } catch (Throwable t) {
580             if (logger.isWarnEnabled()) {
581                 logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t);
582             }
583         }
584     }
585 
586     private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) {
587         if (listeners == null) {
588             listeners = listener;
589         } else if (listeners instanceof DefaultFutureListeners) {
590             ((DefaultFutureListeners) listeners).add(listener);
591         } else {
592             listeners = new DefaultFutureListeners((GenericFutureListener<?>) listeners, listener);
593         }
594     }
595 
596     private void removeListener0(GenericFutureListener<? extends Future<? super V>> listener) {
597         if (listeners instanceof DefaultFutureListeners) {
598             ((DefaultFutureListeners) listeners).remove(listener);
599         } else if (listeners == listener) {
600             listeners = null;
601         }
602     }
603 
604     private boolean setSuccess0(V result) {
605         return setValue0(result == null ? SUCCESS : result);
606     }
607 
608     private boolean setFailure0(Throwable cause) {
609         return setValue0(new CauseHolder(checkNotNull(cause, "cause")));
610     }
611 
612     private boolean setValue0(Object objResult) {
613         if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
614             RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
615             if (checkNotifyWaiters()) {
616                 notifyListeners();
617             }
618             return true;
619         }
620         return false;
621     }
622 
623     /**
624      * Check if there are any waiters and if so notify these.
625      * @return {@code true} if there are any listeners attached to the promise, {@code false} otherwise.
626      */
627     private synchronized boolean checkNotifyWaiters() {
628         if (waiters > 0) {
629             notifyAll();
630         }
631         return listeners != null;
632     }
633 
634     private void incWaiters() {
635         if (waiters == Short.MAX_VALUE) {
636             throw new IllegalStateException("too many waiters: " + this);
637         }
638         ++waiters;
639     }
640 
641     private void decWaiters() {
642         --waiters;
643     }
644 
645     private void rethrowIfFailed() {
646         Throwable cause = cause();
647         if (cause == null) {
648             return;
649         }
650 
651         PlatformDependent.throwException(cause);
652     }
653 
654     private boolean await0(long timeoutNanos, boolean interruptable) throws InterruptedException {
655         if (isDone()) {
656             return true;
657         }
658 
659         if (timeoutNanos <= 0) {
660             return isDone();
661         }
662 
663         if (interruptable && Thread.interrupted()) {
664             throw new InterruptedException(toString());
665         }
666 
667         checkDeadLock();
668 
669         // Start counting time from here instead of the first line of this method,
670         // to avoid/postpone performance cost of System.nanoTime().
671         final long startTime = System.nanoTime();
672         synchronized (this) {
673             boolean interrupted = false;
674             try {
675                 long waitTime = timeoutNanos;
676                 while (!isDone() && waitTime > 0) {
677                     incWaiters();
678                     try {
679                         wait(waitTime / 1000000, (int) (waitTime % 1000000));
680                     } catch (InterruptedException e) {
681                         if (interruptable) {
682                             throw e;
683                         } else {
684                             interrupted = true;
685                         }
686                     } finally {
687                         decWaiters();
688                     }
689                     // Check isDone() in advance, try to avoid calculating the elapsed time later.
690                     if (isDone()) {
691                         return true;
692                     }
693                     // Calculate the elapsed time here instead of in the while condition,
694                     // try to avoid performance cost of System.nanoTime() in the first loop of while.
695                     waitTime = timeoutNanos - (System.nanoTime() - startTime);
696                 }
697                 return isDone();
698             } finally {
699                 if (interrupted) {
700                     Thread.currentThread().interrupt();
701                 }
702             }
703         }
704     }
705 
706     /**
707      * Notify all progressive listeners.
708      * <p>
709      * No attempt is made to ensure notification order if multiple calls are made to this method before
710      * the original invocation completes.
711      * <p>
712      * This will do an iteration over all listeners to get all of type {@link GenericProgressiveFutureListener}s.
713      * @param progress the new progress.
714      * @param total the total progress.
715      */
716     @SuppressWarnings("unchecked")
717     void notifyProgressiveListeners(final long progress, final long total) {
718         final Object listeners = progressiveListeners();
719         if (listeners == null) {
720             return;
721         }
722 
723         final ProgressiveFuture<V> self = (ProgressiveFuture<V>) this;
724 
725         EventExecutor executor = executor();
726         if (executor.inEventLoop()) {
727             if (listeners instanceof GenericProgressiveFutureListener[]) {
728                 notifyProgressiveListeners0(
729                         self, (GenericProgressiveFutureListener<?>[]) listeners, progress, total);
730             } else {
731                 notifyProgressiveListener0(
732                         self, (GenericProgressiveFutureListener<ProgressiveFuture<V>>) listeners, progress, total);
733             }
734         } else {
735             if (listeners instanceof GenericProgressiveFutureListener[]) {
736                 final GenericProgressiveFutureListener<?>[] array =
737                         (GenericProgressiveFutureListener<?>[]) listeners;
738                 safeExecute(executor, new Runnable() {
739                     @Override
740                     public void run() {
741                         notifyProgressiveListeners0(self, array, progress, total);
742                     }
743                 });
744             } else {
745                 final GenericProgressiveFutureListener<ProgressiveFuture<V>> l =
746                         (GenericProgressiveFutureListener<ProgressiveFuture<V>>) listeners;
747                 safeExecute(executor, new Runnable() {
748                     @Override
749                     public void run() {
750                         notifyProgressiveListener0(self, l, progress, total);
751                     }
752                 });
753             }
754         }
755     }
756 
757     /**
758      * Returns a {@link GenericProgressiveFutureListener}, an array of {@link GenericProgressiveFutureListener}, or
759      * {@code null}.
760      */
761     private synchronized Object progressiveListeners() {
762         Object listeners = this.listeners;
763         if (listeners == null) {
764             // No listeners added
765             return null;
766         }
767 
768         if (listeners instanceof DefaultFutureListeners) {
769             // Copy DefaultFutureListeners into an array of listeners.
770             DefaultFutureListeners dfl = (DefaultFutureListeners) listeners;
771             int progressiveSize = dfl.progressiveSize();
772             switch (progressiveSize) {
773                 case 0:
774                     return null;
775                 case 1:
776                     for (GenericFutureListener<?> l: dfl.listeners()) {
777                         if (l instanceof GenericProgressiveFutureListener) {
778                             return l;
779                         }
780                     }
781                     return null;
782             }
783 
784             GenericFutureListener<?>[] array = dfl.listeners();
785             GenericProgressiveFutureListener<?>[] copy = new GenericProgressiveFutureListener[progressiveSize];
786             for (int i = 0, j = 0; j < progressiveSize; i ++) {
787                 GenericFutureListener<?> l = array[i];
788                 if (l instanceof GenericProgressiveFutureListener) {
789                     copy[j ++] = (GenericProgressiveFutureListener<?>) l;
790                 }
791             }
792 
793             return copy;
794         } else if (listeners instanceof GenericProgressiveFutureListener) {
795             return listeners;
796         } else {
797             // Only one listener was added and it's not a progressive listener.
798             return null;
799         }
800     }
801 
802     private static void notifyProgressiveListeners0(
803             ProgressiveFuture<?> future, GenericProgressiveFutureListener<?>[] listeners, long progress, long total) {
804         for (GenericProgressiveFutureListener<?> l: listeners) {
805             if (l == null) {
806                 break;
807             }
808             notifyProgressiveListener0(future, l, progress, total);
809         }
810     }
811 
812     @SuppressWarnings({ "unchecked", "rawtypes" })
813     private static void notifyProgressiveListener0(
814             ProgressiveFuture future, GenericProgressiveFutureListener l, long progress, long total) {
815         try {
816             l.operationProgressed(future, progress, total);
817         } catch (Throwable t) {
818             if (logger.isWarnEnabled()) {
819                 logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationProgressed()", t);
820             }
821         }
822     }
823 
824     private static boolean isCancelled0(Object result) {
825         return result instanceof CauseHolder && ((CauseHolder) result).cause instanceof CancellationException;
826     }
827 
828     private static boolean isDone0(Object result) {
829         return result != null && result != UNCANCELLABLE;
830     }
831 
832     private static final class CauseHolder {
833         final Throwable cause;
834         CauseHolder(Throwable cause) {
835             this.cause = cause;
836         }
837     }
838 
839     private static void safeExecute(EventExecutor executor, Runnable task) {
840         try {
841             executor.execute(task);
842         } catch (Throwable t) {
843             rejectedExecutionLogger.error("Failed to submit a listener notification task. Event loop shut down?", t);
844         }
845     }
846 
847     private static final class StacklessCancellationException extends CancellationException {
848 
849         private static final long serialVersionUID = -2974906711413716191L;
850 
851         private StacklessCancellationException() { }
852 
853         // Override fillInStackTrace() so we not populate the backtrace via a native call and so leak the
854         // Classloader.
855         @Override
856         public Throwable fillInStackTrace() {
857             return this;
858         }
859 
860         static StacklessCancellationException newInstance(Class<?> clazz, String method) {
861             return ThrowableUtil.unknownStackTrace(new StacklessCancellationException(), clazz, method);
862         }
863     }
864 }