View Javadoc
1   /*
2    * Copyright 2013 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.util.concurrent;
17  
18  import io.netty.util.internal.InternalThreadLocalMap;
19  import io.netty.util.internal.PlatformDependent;
20  import io.netty.util.internal.StringUtil;
21  import io.netty.util.internal.SystemPropertyUtil;
22  import io.netty.util.internal.ThrowableUtil;
23  import io.netty.util.internal.logging.InternalLogger;
24  import io.netty.util.internal.logging.InternalLoggerFactory;
25  
26  import java.util.concurrent.CancellationException;
27  import java.util.concurrent.TimeUnit;
28  import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
29  
30  import static io.netty.util.internal.ObjectUtil.checkNotNull;
31  import static java.util.concurrent.TimeUnit.MILLISECONDS;
32  
33  public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
34      private static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultPromise.class);
35      private static final InternalLogger rejectedExecutionLogger =
36              InternalLoggerFactory.getInstance(DefaultPromise.class.getName() + ".rejectedExecution");
37      private static final int MAX_LISTENER_STACK_DEPTH = Math.min(8,
38              SystemPropertyUtil.getInt("io.netty.defaultPromise.maxListenerStackDepth", 8));
39      @SuppressWarnings("rawtypes")
40      private static final AtomicReferenceFieldUpdater<DefaultPromise, Object> RESULT_UPDATER =
41              AtomicReferenceFieldUpdater.newUpdater(DefaultPromise.class, Object.class, "result");
42      private static final Object SUCCESS = new Object();
43      private static final Object UNCANCELLABLE = new Object();
44      private static final CauseHolder CANCELLATION_CAUSE_HOLDER = new CauseHolder(ThrowableUtil.unknownStackTrace(
45              new CancellationException(), DefaultPromise.class, "cancel(...)"));
46  
47      private volatile Object result;
48      private final EventExecutor executor;
49      /**
50       * One or more listeners. Can be a {@link GenericFutureListener} or a {@link DefaultFutureListeners}.
51       * If {@code null}, it means either 1) no listeners were added yet or 2) all listeners were notified.
52       *
53       * Threading - synchronized(this). We must support adding listeners when there is no EventExecutor.
54       */
55      private Object listeners;
56      /**
57       * Threading - synchronized(this). We are required to hold the monitor to use Java's underlying wait()/notifyAll().
58       */
59      private short waiters;
60  
61      /**
62       * Threading - synchronized(this). We must prevent concurrent notification and FIFO listener notification if the
63       * executor changes.
64       */
65      private boolean notifyingListeners;
66  
67      /**
68       * Creates a new instance.
69       *
70       * It is preferable to use {@link EventExecutor#newPromise()} to create a new promise
71       *
72       * @param executor
73       *        the {@link EventExecutor} which is used to notify the promise once it is complete.
74       *        It is assumed this executor will protect against {@link StackOverflowError} exceptions.
75       *        The executor may be used to avoid {@link StackOverflowError} by executing a {@link Runnable} if the stack
76       *        depth exceeds a threshold.
77       *
78       */
79      public DefaultPromise(EventExecutor executor) {
80          this.executor = checkNotNull(executor, "executor");
81      }
82  
83      /**
84       * See {@link #executor()} for expectations of the executor.
85       */
86      protected DefaultPromise() {
87          // only for subclasses
88          executor = null;
89      }
90  
91      @Override
92      public Promise<V> setSuccess(V result) {
93          if (setSuccess0(result)) {
94              notifyListeners();
95              return this;
96          }
97          throw new IllegalStateException("complete already: " + this);
98      }
99  
100     @Override
101     public boolean trySuccess(V result) {
102         if (setSuccess0(result)) {
103             notifyListeners();
104             return true;
105         }
106         return false;
107     }
108 
109     @Override
110     public Promise<V> setFailure(Throwable cause) {
111         if (setFailure0(cause)) {
112             notifyListeners();
113             return this;
114         }
115         throw new IllegalStateException("complete already: " + this, cause);
116     }
117 
118     @Override
119     public boolean tryFailure(Throwable cause) {
120         if (setFailure0(cause)) {
121             notifyListeners();
122             return true;
123         }
124         return false;
125     }
126 
127     @Override
128     public boolean setUncancellable() {
129         if (RESULT_UPDATER.compareAndSet(this, null, UNCANCELLABLE)) {
130             return true;
131         }
132         Object result = this.result;
133         return !isDone0(result) || !isCancelled0(result);
134     }
135 
136     @Override
137     public boolean isSuccess() {
138         Object result = this.result;
139         return result != null && result != UNCANCELLABLE && !(result instanceof CauseHolder);
140     }
141 
142     @Override
143     public boolean isCancellable() {
144         return result == null;
145     }
146 
147     @Override
148     public Throwable cause() {
149         Object result = this.result;
150         return (result instanceof CauseHolder) ? ((CauseHolder) result).cause : null;
151     }
152 
153     @Override
154     public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
155         checkNotNull(listener, "listener");
156 
157         synchronized (this) {
158             addListener0(listener);
159         }
160 
161         if (isDone()) {
162             notifyListeners();
163         }
164 
165         return this;
166     }
167 
168     @Override
169     public Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners) {
170         checkNotNull(listeners, "listeners");
171 
172         synchronized (this) {
173             for (GenericFutureListener<? extends Future<? super V>> listener : listeners) {
174                 if (listener == null) {
175                     break;
176                 }
177                 addListener0(listener);
178             }
179         }
180 
181         if (isDone()) {
182             notifyListeners();
183         }
184 
185         return this;
186     }
187 
188     @Override
189     public Promise<V> removeListener(final GenericFutureListener<? extends Future<? super V>> listener) {
190         checkNotNull(listener, "listener");
191 
192         synchronized (this) {
193             removeListener0(listener);
194         }
195 
196         return this;
197     }
198 
199     @Override
200     public Promise<V> removeListeners(final GenericFutureListener<? extends Future<? super V>>... listeners) {
201         checkNotNull(listeners, "listeners");
202 
203         synchronized (this) {
204             for (GenericFutureListener<? extends Future<? super V>> listener : listeners) {
205                 if (listener == null) {
206                     break;
207                 }
208                 removeListener0(listener);
209             }
210         }
211 
212         return this;
213     }
214 
215     @Override
216     public Promise<V> await() throws InterruptedException {
217         if (isDone()) {
218             return this;
219         }
220 
221         if (Thread.interrupted()) {
222             throw new InterruptedException(toString());
223         }
224 
225         checkDeadLock();
226 
227         synchronized (this) {
228             while (!isDone()) {
229                 incWaiters();
230                 try {
231                     wait();
232                 } finally {
233                     decWaiters();
234                 }
235             }
236         }
237         return this;
238     }
239 
240     @Override
241     public Promise<V> awaitUninterruptibly() {
242         if (isDone()) {
243             return this;
244         }
245 
246         checkDeadLock();
247 
248         boolean interrupted = false;
249         synchronized (this) {
250             while (!isDone()) {
251                 incWaiters();
252                 try {
253                     wait();
254                 } catch (InterruptedException e) {
255                     // Interrupted while waiting.
256                     interrupted = true;
257                 } finally {
258                     decWaiters();
259                 }
260             }
261         }
262 
263         if (interrupted) {
264             Thread.currentThread().interrupt();
265         }
266 
267         return this;
268     }
269 
270     @Override
271     public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
272         return await0(unit.toNanos(timeout), true);
273     }
274 
275     @Override
276     public boolean await(long timeoutMillis) throws InterruptedException {
277         return await0(MILLISECONDS.toNanos(timeoutMillis), true);
278     }
279 
280     @Override
281     public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {
282         try {
283             return await0(unit.toNanos(timeout), false);
284         } catch (InterruptedException e) {
285             // Should not be raised at all.
286             throw new InternalError();
287         }
288     }
289 
290     @Override
291     public boolean awaitUninterruptibly(long timeoutMillis) {
292         try {
293             return await0(MILLISECONDS.toNanos(timeoutMillis), false);
294         } catch (InterruptedException e) {
295             // Should not be raised at all.
296             throw new InternalError();
297         }
298     }
299 
300     @SuppressWarnings("unchecked")
301     @Override
302     public V getNow() {
303         Object result = this.result;
304         if (result instanceof CauseHolder || result == SUCCESS) {
305             return null;
306         }
307         return (V) result;
308     }
309 
310     /**
311      * {@inheritDoc}
312      *
313      * @param mayInterruptIfRunning this value has no effect in this implementation.
314      */
315     @Override
316     public boolean cancel(boolean mayInterruptIfRunning) {
317         if (RESULT_UPDATER.compareAndSet(this, null, CANCELLATION_CAUSE_HOLDER)) {
318             checkNotifyWaiters();
319             notifyListeners();
320             return true;
321         }
322         return false;
323     }
324 
325     @Override
326     public boolean isCancelled() {
327         return isCancelled0(result);
328     }
329 
330     @Override
331     public boolean isDone() {
332         return isDone0(result);
333     }
334 
335     @Override
336     public Promise<V> sync() throws InterruptedException {
337         await();
338         rethrowIfFailed();
339         return this;
340     }
341 
342     @Override
343     public Promise<V> syncUninterruptibly() {
344         awaitUninterruptibly();
345         rethrowIfFailed();
346         return this;
347     }
348 
349     @Override
350     public String toString() {
351         return toStringBuilder().toString();
352     }
353 
354     protected StringBuilder toStringBuilder() {
355         StringBuilder buf = new StringBuilder(64)
356                 .append(StringUtil.simpleClassName(this))
357                 .append('@')
358                 .append(Integer.toHexString(hashCode()));
359 
360         Object result = this.result;
361         if (result == SUCCESS) {
362             buf.append("(success)");
363         } else if (result == UNCANCELLABLE) {
364             buf.append("(uncancellable)");
365         } else if (result instanceof CauseHolder) {
366             buf.append("(failure: ")
367                     .append(((CauseHolder) result).cause)
368                     .append(')');
369         } else if (result != null) {
370             buf.append("(success: ")
371                     .append(result)
372                     .append(')');
373         } else {
374             buf.append("(incomplete)");
375         }
376 
377         return buf;
378     }
379 
380     /**
381      * Get the executor used to notify listeners when this promise is complete.
382      * <p>
383      * It is assumed this executor will protect against {@link StackOverflowError} exceptions.
384      * The executor may be used to avoid {@link StackOverflowError} by executing a {@link Runnable} if the stack
385      * depth exceeds a threshold.
386      * @return The executor used to notify listeners when this promise is complete.
387      */
388     protected EventExecutor executor() {
389         return executor;
390     }
391 
392     protected void checkDeadLock() {
393         EventExecutor e = executor();
394         if (e != null && e.inEventLoop()) {
395             throw new BlockingOperationException(toString());
396         }
397     }
398 
399     /**
400      * Notify a listener that a future has completed.
401      * <p>
402      * This method has a fixed depth of {@link #MAX_LISTENER_STACK_DEPTH} that will limit recursion to prevent
403      * {@link StackOverflowError} and will stop notifying listeners added after this threshold is exceeded.
404      * @param eventExecutor the executor to use to notify the listener {@code listener}.
405      * @param future the future that is complete.
406      * @param listener the listener to notify.
407      */
408     protected static void notifyListener(
409             EventExecutor eventExecutor, final Future<?> future, final GenericFutureListener<?> listener) {
410         checkNotNull(eventExecutor, "eventExecutor");
411         checkNotNull(future, "future");
412         checkNotNull(listener, "listener");
413         notifyListenerWithStackOverFlowProtection(eventExecutor, future, listener);
414     }
415 
416     private void notifyListeners() {
417         EventExecutor executor = executor();
418         if (executor.inEventLoop()) {
419             final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
420             final int stackDepth = threadLocals.futureListenerStackDepth();
421             if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
422                 threadLocals.setFutureListenerStackDepth(stackDepth + 1);
423                 try {
424                     notifyListenersNow();
425                 } finally {
426                     threadLocals.setFutureListenerStackDepth(stackDepth);
427                 }
428                 return;
429             }
430         }
431 
432         safeExecute(executor, new Runnable() {
433             @Override
434             public void run() {
435                 notifyListenersNow();
436             }
437         });
438     }
439 
440     /**
441      * The logic in this method should be identical to {@link #notifyListeners()} but
442      * cannot share code because the listener(s) cannot be cached for an instance of {@link DefaultPromise} since the
443      * listener(s) may be changed and is protected by a synchronized operation.
444      */
445     private static void notifyListenerWithStackOverFlowProtection(final EventExecutor executor,
446                                                                   final Future<?> future,
447                                                                   final GenericFutureListener<?> listener) {
448         if (executor.inEventLoop()) {
449             final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
450             final int stackDepth = threadLocals.futureListenerStackDepth();
451             if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
452                 threadLocals.setFutureListenerStackDepth(stackDepth + 1);
453                 try {
454                     notifyListener0(future, listener);
455                 } finally {
456                     threadLocals.setFutureListenerStackDepth(stackDepth);
457                 }
458                 return;
459             }
460         }
461 
462         safeExecute(executor, new Runnable() {
463             @Override
464             public void run() {
465                 notifyListener0(future, listener);
466             }
467         });
468     }
469 
470     private void notifyListenersNow() {
471         Object listeners;
472         synchronized (this) {
473             // Only proceed if there are listeners to notify and we are not already notifying listeners.
474             if (notifyingListeners || this.listeners == null) {
475                 return;
476             }
477             notifyingListeners = true;
478             listeners = this.listeners;
479             this.listeners = null;
480         }
481         for (;;) {
482             if (listeners instanceof DefaultFutureListeners) {
483                 notifyListeners0((DefaultFutureListeners) listeners);
484             } else {
485                 notifyListener0(this, (GenericFutureListener<?>) listeners);
486             }
487             synchronized (this) {
488                 if (this.listeners == null) {
489                     // Nothing can throw from within this method, so setting notifyingListeners back to false does not
490                     // need to be in a finally block.
491                     notifyingListeners = false;
492                     return;
493                 }
494                 listeners = this.listeners;
495                 this.listeners = null;
496             }
497         }
498     }
499 
500     private void notifyListeners0(DefaultFutureListeners listeners) {
501         GenericFutureListener<?>[] a = listeners.listeners();
502         int size = listeners.size();
503         for (int i = 0; i < size; i ++) {
504             notifyListener0(this, a[i]);
505         }
506     }
507 
508     @SuppressWarnings({ "unchecked", "rawtypes" })
509     private static void notifyListener0(Future future, GenericFutureListener l) {
510         try {
511             l.operationComplete(future);
512         } catch (Throwable t) {
513             logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t);
514         }
515     }
516 
517     private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) {
518         if (listeners == null) {
519             listeners = listener;
520         } else if (listeners instanceof DefaultFutureListeners) {
521             ((DefaultFutureListeners) listeners).add(listener);
522         } else {
523             listeners = new DefaultFutureListeners((GenericFutureListener<?>) listeners, listener);
524         }
525     }
526 
527     private void removeListener0(GenericFutureListener<? extends Future<? super V>> listener) {
528         if (listeners instanceof DefaultFutureListeners) {
529             ((DefaultFutureListeners) listeners).remove(listener);
530         } else if (listeners == listener) {
531             listeners = null;
532         }
533     }
534 
535     private boolean setSuccess0(V result) {
536         return setValue0(result == null ? SUCCESS : result);
537     }
538 
539     private boolean setFailure0(Throwable cause) {
540         return setValue0(new CauseHolder(checkNotNull(cause, "cause")));
541     }
542 
543     private boolean setValue0(Object objResult) {
544         if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
545             RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
546             checkNotifyWaiters();
547             return true;
548         }
549         return false;
550     }
551 
552     private synchronized void checkNotifyWaiters() {
553         if (waiters > 0) {
554             notifyAll();
555         }
556     }
557 
558     private void incWaiters() {
559         if (waiters == Short.MAX_VALUE) {
560             throw new IllegalStateException("too many waiters: " + this);
561         }
562         ++waiters;
563     }
564 
565     private void decWaiters() {
566         --waiters;
567     }
568 
569     private void rethrowIfFailed() {
570         Throwable cause = cause();
571         if (cause == null) {
572             return;
573         }
574 
575         PlatformDependent.throwException(cause);
576     }
577 
578     private boolean await0(long timeoutNanos, boolean interruptable) throws InterruptedException {
579         if (isDone()) {
580             return true;
581         }
582 
583         if (timeoutNanos <= 0) {
584             return isDone();
585         }
586 
587         if (interruptable && Thread.interrupted()) {
588             throw new InterruptedException(toString());
589         }
590 
591         checkDeadLock();
592 
593         long startTime = System.nanoTime();
594         long waitTime = timeoutNanos;
595         boolean interrupted = false;
596         try {
597             for (;;) {
598                 synchronized (this) {
599                     if (isDone()) {
600                         return true;
601                     }
602                     incWaiters();
603                     try {
604                         wait(waitTime / 1000000, (int) (waitTime % 1000000));
605                     } catch (InterruptedException e) {
606                         if (interruptable) {
607                             throw e;
608                         } else {
609                             interrupted = true;
610                         }
611                     } finally {
612                         decWaiters();
613                     }
614                 }
615                 if (isDone()) {
616                     return true;
617                 } else {
618                     waitTime = timeoutNanos - (System.nanoTime() - startTime);
619                     if (waitTime <= 0) {
620                         return isDone();
621                     }
622                 }
623             }
624         } finally {
625             if (interrupted) {
626                 Thread.currentThread().interrupt();
627             }
628         }
629     }
630 
631     /**
632      * Notify all progressive listeners.
633      * <p>
634      * No attempt is made to ensure notification order if multiple calls are made to this method before
635      * the original invocation completes.
636      * <p>
637      * This will do an iteration over all listeners to get all of type {@link GenericProgressiveFutureListener}s.
638      * @param progress the new progress.
639      * @param total the total progress.
640      */
641     @SuppressWarnings("unchecked")
642     void notifyProgressiveListeners(final long progress, final long total) {
643         final Object listeners = progressiveListeners();
644         if (listeners == null) {
645             return;
646         }
647 
648         final ProgressiveFuture<V> self = (ProgressiveFuture<V>) this;
649 
650         EventExecutor executor = executor();
651         if (executor.inEventLoop()) {
652             if (listeners instanceof GenericProgressiveFutureListener[]) {
653                 notifyProgressiveListeners0(
654                         self, (GenericProgressiveFutureListener<?>[]) listeners, progress, total);
655             } else {
656                 notifyProgressiveListener0(
657                         self, (GenericProgressiveFutureListener<ProgressiveFuture<V>>) listeners, progress, total);
658             }
659         } else {
660             if (listeners instanceof GenericProgressiveFutureListener[]) {
661                 final GenericProgressiveFutureListener<?>[] array =
662                         (GenericProgressiveFutureListener<?>[]) listeners;
663                 safeExecute(executor, new Runnable() {
664                     @Override
665                     public void run() {
666                         notifyProgressiveListeners0(self, array, progress, total);
667                     }
668                 });
669             } else {
670                 final GenericProgressiveFutureListener<ProgressiveFuture<V>> l =
671                         (GenericProgressiveFutureListener<ProgressiveFuture<V>>) listeners;
672                 safeExecute(executor, new Runnable() {
673                     @Override
674                     public void run() {
675                         notifyProgressiveListener0(self, l, progress, total);
676                     }
677                 });
678             }
679         }
680     }
681 
682     /**
683      * Returns a {@link GenericProgressiveFutureListener}, an array of {@link GenericProgressiveFutureListener}, or
684      * {@code null}.
685      */
686     private synchronized Object progressiveListeners() {
687         Object listeners = this.listeners;
688         if (listeners == null) {
689             // No listeners added
690             return null;
691         }
692 
693         if (listeners instanceof DefaultFutureListeners) {
694             // Copy DefaultFutureListeners into an array of listeners.
695             DefaultFutureListeners dfl = (DefaultFutureListeners) listeners;
696             int progressiveSize = dfl.progressiveSize();
697             switch (progressiveSize) {
698                 case 0:
699                     return null;
700                 case 1:
701                     for (GenericFutureListener<?> l: dfl.listeners()) {
702                         if (l instanceof GenericProgressiveFutureListener) {
703                             return l;
704                         }
705                     }
706                     return null;
707             }
708 
709             GenericFutureListener<?>[] array = dfl.listeners();
710             GenericProgressiveFutureListener<?>[] copy = new GenericProgressiveFutureListener[progressiveSize];
711             for (int i = 0, j = 0; j < progressiveSize; i ++) {
712                 GenericFutureListener<?> l = array[i];
713                 if (l instanceof GenericProgressiveFutureListener) {
714                     copy[j ++] = (GenericProgressiveFutureListener<?>) l;
715                 }
716             }
717 
718             return copy;
719         } else if (listeners instanceof GenericProgressiveFutureListener) {
720             return listeners;
721         } else {
722             // Only one listener was added and it's not a progressive listener.
723             return null;
724         }
725     }
726 
727     private static void notifyProgressiveListeners0(
728             ProgressiveFuture<?> future, GenericProgressiveFutureListener<?>[] listeners, long progress, long total) {
729         for (GenericProgressiveFutureListener<?> l: listeners) {
730             if (l == null) {
731                 break;
732             }
733             notifyProgressiveListener0(future, l, progress, total);
734         }
735     }
736 
737     @SuppressWarnings({ "unchecked", "rawtypes" })
738     private static void notifyProgressiveListener0(
739             ProgressiveFuture future, GenericProgressiveFutureListener l, long progress, long total) {
740         try {
741             l.operationProgressed(future, progress, total);
742         } catch (Throwable t) {
743             logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationProgressed()", t);
744         }
745     }
746 
747     private static boolean isCancelled0(Object result) {
748         return result instanceof CauseHolder && ((CauseHolder) result).cause instanceof CancellationException;
749     }
750 
751     private static boolean isDone0(Object result) {
752         return result != null && result != UNCANCELLABLE;
753     }
754 
755     private static final class CauseHolder {
756         final Throwable cause;
757         CauseHolder(Throwable cause) {
758             this.cause = cause;
759         }
760     }
761 
762     private static void safeExecute(EventExecutor executor, Runnable task) {
763         try {
764             executor.execute(task);
765         } catch (Throwable t) {
766             rejectedExecutionLogger.error("Failed to submit a listener notification task. Event loop shut down?", t);
767         }
768     }
769 }