View Javadoc
1   /*
2    * Copyright 2013 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.util.concurrent;
17  
18  import io.netty.util.Signal;
19  import io.netty.util.internal.EmptyArrays;
20  import io.netty.util.internal.InternalThreadLocalMap;
21  import io.netty.util.internal.PlatformDependent;
22  import io.netty.util.internal.StringUtil;
23  import io.netty.util.internal.logging.InternalLogger;
24  import io.netty.util.internal.logging.InternalLoggerFactory;
25  
26  import java.util.ArrayDeque;
27  import java.util.concurrent.CancellationException;
28  import java.util.concurrent.TimeUnit;
29  
30  import static java.util.concurrent.TimeUnit.*;
31  
32  public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
33  
34      private static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultPromise.class);
35      private static final InternalLogger rejectedExecutionLogger =
36              InternalLoggerFactory.getInstance(DefaultPromise.class.getName() + ".rejectedExecution");
37  
38      private static final int MAX_LISTENER_STACK_DEPTH = 8;
39      private static final Signal SUCCESS = Signal.valueOf(DefaultPromise.class, "SUCCESS");
40      private static final Signal UNCANCELLABLE = Signal.valueOf(DefaultPromise.class, "UNCANCELLABLE");
41      private static final CauseHolder CANCELLATION_CAUSE_HOLDER = new CauseHolder(new CancellationException());
42  
43      static {
44          CANCELLATION_CAUSE_HOLDER.cause.setStackTrace(EmptyArrays.EMPTY_STACK_TRACE);
45      }
46  
47      EventExecutor executor;
48  
49      private volatile Object result;
50  
51      /**
52       * One or more listeners. Can be a {@link GenericFutureListener} or a {@link DefaultFutureListeners}.
53       * If {@code null}, it means either 1) no listeners were added yet or 2) all listeners were notified.
54       */
55      private Object listeners;
56  
57      /**
58       * The list of the listeners that were added after the promise is done.  Initially {@code null} and lazily
59       * instantiated when the late listener is scheduled to be notified later.  Also used as a cached {@link Runnable}
60       * that performs the notification of the listeners it contains.
61       */
62      private LateListeners lateListeners;
63  
64      private short waiters;
65  
66      /**
67       * Creates a new instance.
68       *
69       * It is preferable to use {@link EventExecutor#newPromise()} to create a new promise
70       *
71       * @param executor
72       *        the {@link EventExecutor} which is used to notify the promise once it is complete
73       */
74      public DefaultPromise(EventExecutor executor) {
75          if (executor == null) {
76              throw new NullPointerException("executor");
77          }
78          this.executor = executor;
79      }
80  
81      protected DefaultPromise() {
82          // only for subclasses
83          executor = null;
84      }
85  
86      protected EventExecutor executor() {
87          return executor;
88      }
89  
90      @Override
91      public boolean isCancelled() {
92          return isCancelled0(result);
93      }
94  
95      private static boolean isCancelled0(Object result) {
96          return result instanceof CauseHolder && ((CauseHolder) result).cause instanceof CancellationException;
97      }
98  
99      @Override
100     public boolean isCancellable() {
101         return result == null;
102     }
103 
104     @Override
105     public boolean isDone() {
106         return isDone0(result);
107     }
108 
109     private static boolean isDone0(Object result) {
110         return result != null && result != UNCANCELLABLE;
111     }
112 
113     @Override
114     public boolean isSuccess() {
115         Object result = this.result;
116         if (result == null || result == UNCANCELLABLE) {
117             return false;
118         }
119         return !(result instanceof CauseHolder);
120     }
121 
122     @Override
123     public Throwable cause() {
124         Object result = this.result;
125         if (result instanceof CauseHolder) {
126             return ((CauseHolder) result).cause;
127         }
128         return null;
129     }
130 
131     @Override
132     public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
133         if (listener == null) {
134             throw new NullPointerException("listener");
135         }
136 
137         if (isDone()) {
138             notifyLateListener(listener);
139             return this;
140         }
141 
142         synchronized (this) {
143             if (!isDone()) {
144                 if (listeners == null) {
145                     listeners = listener;
146                 } else {
147                     if (listeners instanceof DefaultFutureListeners) {
148                         ((DefaultFutureListeners) listeners).add(listener);
149                     } else {
150                         final GenericFutureListener<? extends Future<V>> firstListener =
151                                 (GenericFutureListener<? extends Future<V>>) listeners;
152                         listeners = new DefaultFutureListeners(firstListener, listener);
153                     }
154                 }
155                 return this;
156             }
157         }
158 
159         notifyLateListener(listener);
160         return this;
161     }
162 
163     @Override
164     public Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners) {
165         if (listeners == null) {
166             throw new NullPointerException("listeners");
167         }
168 
169         for (GenericFutureListener<? extends Future<? super V>> l: listeners) {
170             if (l == null) {
171                 break;
172             }
173             addListener(l);
174         }
175         return this;
176     }
177 
178     @Override
179     public Promise<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener) {
180         if (listener == null) {
181             throw new NullPointerException("listener");
182         }
183 
184         if (isDone()) {
185             return this;
186         }
187 
188         synchronized (this) {
189             if (!isDone()) {
190                 if (listeners instanceof DefaultFutureListeners) {
191                     ((DefaultFutureListeners) listeners).remove(listener);
192                 } else if (listeners == listener) {
193                     listeners = null;
194                 }
195             }
196         }
197 
198         return this;
199     }
200 
201     @Override
202     public Promise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners) {
203         if (listeners == null) {
204             throw new NullPointerException("listeners");
205         }
206 
207         for (GenericFutureListener<? extends Future<? super V>> l: listeners) {
208             if (l == null) {
209                 break;
210             }
211             removeListener(l);
212         }
213         return this;
214     }
215 
216     @Override
217     public Promise<V> sync() throws InterruptedException {
218         await();
219         rethrowIfFailed();
220         return this;
221     }
222 
223     @Override
224     public Promise<V> syncUninterruptibly() {
225         awaitUninterruptibly();
226         rethrowIfFailed();
227         return this;
228     }
229 
230     private void rethrowIfFailed() {
231         Throwable cause = cause();
232         if (cause == null) {
233             return;
234         }
235 
236         PlatformDependent.throwException(cause);
237     }
238 
239     @Override
240     public Promise<V> await() throws InterruptedException {
241         if (isDone()) {
242             return this;
243         }
244 
245         if (Thread.interrupted()) {
246             throw new InterruptedException(toString());
247         }
248 
249         synchronized (this) {
250             while (!isDone()) {
251                 checkDeadLock();
252                 incWaiters();
253                 try {
254                     wait();
255                 } finally {
256                     decWaiters();
257                 }
258             }
259         }
260         return this;
261     }
262 
263     @Override
264     public boolean await(long timeout, TimeUnit unit)
265             throws InterruptedException {
266         return await0(unit.toNanos(timeout), true);
267     }
268 
269     @Override
270     public boolean await(long timeoutMillis) throws InterruptedException {
271         return await0(MILLISECONDS.toNanos(timeoutMillis), true);
272     }
273 
274     @Override
275     public Promise<V> awaitUninterruptibly() {
276         if (isDone()) {
277             return this;
278         }
279 
280         boolean interrupted = false;
281         synchronized (this) {
282             while (!isDone()) {
283                 checkDeadLock();
284                 incWaiters();
285                 try {
286                     wait();
287                 } catch (InterruptedException e) {
288                     // Interrupted while waiting.
289                     interrupted = true;
290                 } finally {
291                     decWaiters();
292                 }
293             }
294         }
295 
296         if (interrupted) {
297             Thread.currentThread().interrupt();
298         }
299 
300         return this;
301     }
302 
303     @Override
304     public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {
305         try {
306             return await0(unit.toNanos(timeout), false);
307         } catch (InterruptedException e) {
308             // Should not be raised at all.
309             throw new InternalError();
310         }
311     }
312 
313     @Override
314     public boolean awaitUninterruptibly(long timeoutMillis) {
315         try {
316             return await0(MILLISECONDS.toNanos(timeoutMillis), false);
317         } catch (InterruptedException e) {
318             // Should not be raised at all.
319             throw new InternalError();
320         }
321     }
322 
323     private boolean await0(long timeoutNanos, boolean interruptable) throws InterruptedException {
324         if (isDone()) {
325             return true;
326         }
327 
328         if (timeoutNanos <= 0) {
329             return isDone();
330         }
331 
332         if (interruptable && Thread.interrupted()) {
333             throw new InterruptedException(toString());
334         }
335 
336         long startTime = System.nanoTime();
337         long waitTime = timeoutNanos;
338         boolean interrupted = false;
339 
340         try {
341             synchronized (this) {
342                 if (isDone()) {
343                     return true;
344                 }
345 
346                 if (waitTime <= 0) {
347                     return isDone();
348                 }
349 
350                 checkDeadLock();
351                 incWaiters();
352                 try {
353                     for (;;) {
354                         try {
355                             wait(waitTime / 1000000, (int) (waitTime % 1000000));
356                         } catch (InterruptedException e) {
357                             if (interruptable) {
358                                 throw e;
359                             } else {
360                                 interrupted = true;
361                             }
362                         }
363 
364                         if (isDone()) {
365                             return true;
366                         } else {
367                             waitTime = timeoutNanos - (System.nanoTime() - startTime);
368                             if (waitTime <= 0) {
369                                 return isDone();
370                             }
371                         }
372                     }
373                 } finally {
374                     decWaiters();
375                 }
376             }
377         } finally {
378             if (interrupted) {
379                 Thread.currentThread().interrupt();
380             }
381         }
382     }
383 
384     /**
385      * Do deadlock checks
386      */
387     protected void checkDeadLock() {
388         EventExecutor e = executor();
389         if (e != null && e.inEventLoop()) {
390             throw new BlockingOperationException(toString());
391         }
392     }
393 
394     @Override
395     public Promise<V> setSuccess(V result) {
396         if (setSuccess0(result)) {
397             notifyListeners();
398             return this;
399         }
400         throw new IllegalStateException("complete already: " + this);
401     }
402 
403     @Override
404     public boolean trySuccess(V result) {
405         if (setSuccess0(result)) {
406             notifyListeners();
407             return true;
408         }
409         return false;
410     }
411 
412     @Override
413     public Promise<V> setFailure(Throwable cause) {
414         if (setFailure0(cause)) {
415             notifyListeners();
416             return this;
417         }
418         throw new IllegalStateException("complete already: " + this, cause);
419     }
420 
421     @Override
422     public boolean tryFailure(Throwable cause) {
423         if (setFailure0(cause)) {
424             notifyListeners();
425             return true;
426         }
427         return false;
428     }
429 
430     @Override
431     public boolean cancel(boolean mayInterruptIfRunning) {
432         Object result = this.result;
433         if (isDone0(result) || result == UNCANCELLABLE) {
434             return false;
435         }
436 
437         synchronized (this) {
438             // Allow only once.
439             result = this.result;
440             if (isDone0(result) || result == UNCANCELLABLE) {
441                 return false;
442             }
443 
444             this.result = CANCELLATION_CAUSE_HOLDER;
445             if (hasWaiters()) {
446                 notifyAll();
447             }
448         }
449 
450         notifyListeners();
451         return true;
452     }
453 
454     @Override
455     public boolean setUncancellable() {
456         Object result = this.result;
457         if (isDone0(result)) {
458             return !isCancelled0(result);
459         }
460 
461         synchronized (this) {
462             // Allow only once.
463             result = this.result;
464             if (isDone0(result)) {
465                 return !isCancelled0(result);
466             }
467 
468             this.result = UNCANCELLABLE;
469         }
470         return true;
471     }
472 
473     private boolean setFailure0(Throwable cause) {
474         if (cause == null) {
475             throw new NullPointerException("cause");
476         }
477 
478         if (isDone()) {
479             return false;
480         }
481 
482         synchronized (this) {
483             // Allow only once.
484             if (isDone()) {
485                 return false;
486             }
487 
488             result = new CauseHolder(cause);
489             if (hasWaiters()) {
490                 notifyAll();
491             }
492         }
493         return true;
494     }
495 
496     private boolean setSuccess0(V result) {
497         if (isDone()) {
498             return false;
499         }
500 
501         synchronized (this) {
502             // Allow only once.
503             if (isDone()) {
504                 return false;
505             }
506             if (result == null) {
507                 this.result = SUCCESS;
508             } else {
509                 this.result = result;
510             }
511             if (hasWaiters()) {
512                 notifyAll();
513             }
514         }
515         return true;
516     }
517 
518     @Override
519     @SuppressWarnings("unchecked")
520     public V getNow() {
521         Object result = this.result;
522         if (result instanceof CauseHolder || result == SUCCESS) {
523             return null;
524         }
525         return (V) result;
526     }
527 
528     private boolean hasWaiters() {
529         return waiters > 0;
530     }
531 
532     private void incWaiters() {
533         if (waiters == Short.MAX_VALUE) {
534             throw new IllegalStateException("too many waiters: " + this);
535         }
536         waiters ++;
537     }
538 
539     private void decWaiters() {
540         waiters --;
541     }
542 
543     private void notifyListeners() {
544         // This method doesn't need synchronization because:
545         // 1) This method is always called after synchronized (this) block.
546         //    Hence any listener list modification happens-before this method.
547         // 2) This method is called only when 'done' is true.  Once 'done'
548         //    becomes true, the listener list is never modified - see add/removeListener()
549 
550         Object listeners = this.listeners;
551         if (listeners == null) {
552             return;
553         }
554 
555         EventExecutor executor = executor();
556         if (executor.inEventLoop()) {
557             final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
558             final int stackDepth = threadLocals.futureListenerStackDepth();
559             if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
560                 threadLocals.setFutureListenerStackDepth(stackDepth + 1);
561                 try {
562                     if (listeners instanceof DefaultFutureListeners) {
563                         notifyListeners0(this, (DefaultFutureListeners) listeners);
564                     } else {
565                         final GenericFutureListener<? extends Future<V>> l =
566                                 (GenericFutureListener<? extends Future<V>>) listeners;
567                         notifyListener0(this, l);
568                     }
569                 } finally {
570                     this.listeners = null;
571                     threadLocals.setFutureListenerStackDepth(stackDepth);
572                 }
573                 return;
574             }
575         }
576 
577         if (listeners instanceof DefaultFutureListeners) {
578             final DefaultFutureListeners dfl = (DefaultFutureListeners) listeners;
579             execute(executor, new Runnable() {
580                 @Override
581                 public void run() {
582                     notifyListeners0(DefaultPromise.this, dfl);
583                     DefaultPromise.this.listeners = null;
584                 }
585             });
586         } else {
587             final GenericFutureListener<? extends Future<V>> l =
588                     (GenericFutureListener<? extends Future<V>>) listeners;
589             execute(executor, new Runnable() {
590                 @Override
591                 public void run() {
592                     notifyListener0(DefaultPromise.this, l);
593                     DefaultPromise.this.listeners = null;
594                 }
595             });
596         }
597     }
598 
599     private static void notifyListeners0(Future<?> future, DefaultFutureListeners listeners) {
600         final GenericFutureListener<?>[] a = listeners.listeners();
601         final int size = listeners.size();
602         for (int i = 0; i < size; i ++) {
603             notifyListener0(future, a[i]);
604         }
605     }
606 
607     /**
608      * Notifies the specified listener which were added after this promise is already done.
609      * This method ensures that the specified listener is not notified until {@link #listeners} becomes {@code null}
610      * to avoid the case where the late listeners are notified even before the early listeners are notified.
611      */
612     private void notifyLateListener(final GenericFutureListener<?> l) {
613         final EventExecutor executor = executor();
614         if (executor.inEventLoop()) {
615             if (listeners == null && lateListeners == null) {
616                 final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
617                 final int stackDepth = threadLocals.futureListenerStackDepth();
618                 if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
619                     threadLocals.setFutureListenerStackDepth(stackDepth + 1);
620                     try {
621                         notifyListener0(this, l);
622                     } finally {
623                         threadLocals.setFutureListenerStackDepth(stackDepth);
624                     }
625                     return;
626                 }
627             } else {
628                 LateListeners lateListeners = this.lateListeners;
629                 if (lateListeners == null) {
630                     this.lateListeners = lateListeners = new LateListeners();
631                 }
632                 lateListeners.add(l);
633                 execute(executor, lateListeners);
634                 return;
635             }
636         }
637 
638         // Add the late listener to lateListeners in the executor thread for thread safety.
639         // We could just make LateListeners extend ConcurrentLinkedQueue, but it's an overkill considering
640         // that most asynchronous applications won't execute this code path.
641         execute(executor, new LateListenerNotifier(l));
642     }
643 
644     protected static void notifyListener(
645             final EventExecutor eventExecutor, final Future<?> future, final GenericFutureListener<?> l) {
646 
647         if (eventExecutor.inEventLoop()) {
648             final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
649             final int stackDepth = threadLocals.futureListenerStackDepth();
650             if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
651                 threadLocals.setFutureListenerStackDepth(stackDepth + 1);
652                 try {
653                     notifyListener0(future, l);
654                 } finally {
655                     threadLocals.setFutureListenerStackDepth(stackDepth);
656                 }
657                 return;
658             }
659         }
660 
661         execute(eventExecutor, new Runnable() {
662             @Override
663             public void run() {
664                 notifyListener0(future, l);
665             }
666         });
667     }
668 
669     private static void execute(EventExecutor executor, Runnable task) {
670         try {
671             executor.execute(task);
672         } catch (Throwable t) {
673             rejectedExecutionLogger.error("Failed to submit a listener notification task. Event loop shut down?", t);
674         }
675     }
676 
677     @SuppressWarnings({ "unchecked", "rawtypes" })
678     static void notifyListener0(Future future, GenericFutureListener l) {
679         try {
680             l.operationComplete(future);
681         } catch (Throwable t) {
682             if (logger.isWarnEnabled()) {
683                 logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t);
684             }
685         }
686     }
687 
688     /**
689      * Returns a {@link GenericProgressiveFutureListener}, an array of {@link GenericProgressiveFutureListener}, or
690      * {@code null}.
691      */
692     private synchronized Object progressiveListeners() {
693         Object listeners = this.listeners;
694         if (listeners == null) {
695             // No listeners added
696             return null;
697         }
698 
699         if (listeners instanceof DefaultFutureListeners) {
700             // Copy DefaultFutureListeners into an array of listeners.
701             DefaultFutureListeners dfl = (DefaultFutureListeners) listeners;
702             int progressiveSize = dfl.progressiveSize();
703             switch (progressiveSize) {
704                 case 0:
705                     return null;
706                 case 1:
707                     for (GenericFutureListener<?> l: dfl.listeners()) {
708                         if (l instanceof GenericProgressiveFutureListener) {
709                             return l;
710                         }
711                     }
712                     return null;
713             }
714 
715             GenericFutureListener<?>[] array = dfl.listeners();
716             GenericProgressiveFutureListener<?>[] copy = new GenericProgressiveFutureListener[progressiveSize];
717             for (int i = 0, j = 0; j < progressiveSize; i ++) {
718                 GenericFutureListener<?> l = array[i];
719                 if (l instanceof GenericProgressiveFutureListener) {
720                     copy[j ++] = (GenericProgressiveFutureListener<?>) l;
721                 }
722             }
723 
724             return copy;
725         } else if (listeners instanceof GenericProgressiveFutureListener) {
726             return listeners;
727         } else {
728             // Only one listener was added and it's not a progressive listener.
729             return null;
730         }
731     }
732 
733     @SuppressWarnings("unchecked")
734     void notifyProgressiveListeners(final long progress, final long total) {
735         final Object listeners = progressiveListeners();
736         if (listeners == null) {
737             return;
738         }
739 
740         final ProgressiveFuture<V> self = (ProgressiveFuture<V>) this;
741 
742         EventExecutor executor = executor();
743         if (executor.inEventLoop()) {
744             if (listeners instanceof GenericProgressiveFutureListener[]) {
745                 notifyProgressiveListeners0(
746                         self, (GenericProgressiveFutureListener<?>[]) listeners, progress, total);
747             } else {
748                 notifyProgressiveListener0(
749                         self, (GenericProgressiveFutureListener<ProgressiveFuture<V>>) listeners, progress, total);
750             }
751         } else {
752             if (listeners instanceof GenericProgressiveFutureListener[]) {
753                 final GenericProgressiveFutureListener<?>[] array =
754                         (GenericProgressiveFutureListener<?>[]) listeners;
755                 execute(executor, new Runnable() {
756                     @Override
757                     public void run() {
758                         notifyProgressiveListeners0(self, array, progress, total);
759                     }
760                 });
761             } else {
762                 final GenericProgressiveFutureListener<ProgressiveFuture<V>> l =
763                         (GenericProgressiveFutureListener<ProgressiveFuture<V>>) listeners;
764                 execute(executor, new Runnable() {
765                     @Override
766                     public void run() {
767                         notifyProgressiveListener0(self, l, progress, total);
768                     }
769                 });
770             }
771         }
772     }
773 
774     private static void notifyProgressiveListeners0(
775             ProgressiveFuture<?> future, GenericProgressiveFutureListener<?>[] listeners, long progress, long total) {
776         for (GenericProgressiveFutureListener<?> l: listeners) {
777             if (l == null) {
778                 break;
779             }
780             notifyProgressiveListener0(future, l, progress, total);
781         }
782     }
783 
784     @SuppressWarnings({ "unchecked", "rawtypes" })
785     private static void notifyProgressiveListener0(
786             ProgressiveFuture future, GenericProgressiveFutureListener l, long progress, long total) {
787         try {
788             l.operationProgressed(future, progress, total);
789         } catch (Throwable t) {
790             if (logger.isWarnEnabled()) {
791                 logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationProgressed()", t);
792             }
793         }
794     }
795 
796     private static final class CauseHolder {
797         final Throwable cause;
798         CauseHolder(Throwable cause) {
799             this.cause = cause;
800         }
801     }
802 
803     @Override
804     public String toString() {
805         return toStringBuilder().toString();
806     }
807 
808     protected StringBuilder toStringBuilder() {
809         StringBuilder buf = new StringBuilder(64)
810             .append(StringUtil.simpleClassName(this))
811             .append('@')
812             .append(Integer.toHexString(hashCode()));
813 
814         Object result = this.result;
815         if (result == SUCCESS) {
816             buf.append("(success)");
817         } else if (result == UNCANCELLABLE) {
818             buf.append("(uncancellable)");
819         } else if (result instanceof CauseHolder) {
820             buf.append("(failure(")
821                .append(((CauseHolder) result).cause)
822                .append(')');
823         } else {
824             buf.append("(incomplete)");
825         }
826         return buf;
827     }
828 
829     private final class LateListeners extends ArrayDeque<GenericFutureListener<?>> implements Runnable {
830 
831         private static final long serialVersionUID = -687137418080392244L;
832 
833         LateListeners() {
834             super(2);
835         }
836 
837         @Override
838         public void run() {
839             if (listeners == null) {
840                 for (;;) {
841                     GenericFutureListener<?> l = poll();
842                     if (l == null) {
843                         break;
844                     }
845                     notifyListener0(DefaultPromise.this, l);
846                 }
847             } else {
848                 // Reschedule until the initial notification is done to avoid the race condition
849                 // where the notification is made in an incorrect order.
850                 execute(executor(), this);
851             }
852         }
853     }
854 
855     private final class LateListenerNotifier implements Runnable {
856         private GenericFutureListener<?> l;
857 
858         LateListenerNotifier(GenericFutureListener<?> l) {
859             this.l = l;
860         }
861 
862         @Override
863         public void run() {
864             LateListeners lateListeners = DefaultPromise.this.lateListeners;
865             if (l != null) {
866                 if (lateListeners == null) {
867                     DefaultPromise.this.lateListeners = lateListeners = new LateListeners();
868                 }
869                 lateListeners.add(l);
870                 l = null;
871             }
872 
873             lateListeners.run();
874         }
875     }
876 }