View Javadoc
1   /*
2    * Copyright 2013 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.util.concurrent;
17  
18  import io.netty.util.internal.InternalThreadLocalMap;
19  import io.netty.util.internal.PlatformDependent;
20  import io.netty.util.internal.StringUtil;
21  import io.netty.util.internal.SystemPropertyUtil;
22  import io.netty.util.internal.ThrowableUtil;
23  import io.netty.util.internal.logging.InternalLogger;
24  import io.netty.util.internal.logging.InternalLoggerFactory;
25  
26  import java.util.concurrent.CancellationException;
27  import java.util.concurrent.TimeUnit;
28  import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
29  
30  import static io.netty.util.internal.ObjectUtil.checkNotNull;
31  import static java.util.concurrent.TimeUnit.MILLISECONDS;
32  
33  public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
34      private static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultPromise.class);
35      private static final InternalLogger rejectedExecutionLogger =
36              InternalLoggerFactory.getInstance(DefaultPromise.class.getName() + ".rejectedExecution");
37      private static final int MAX_LISTENER_STACK_DEPTH = Math.min(8,
38              SystemPropertyUtil.getInt("io.netty.defaultPromise.maxListenerStackDepth", 8));
39      @SuppressWarnings("rawtypes")
40      private static final AtomicReferenceFieldUpdater<DefaultPromise, Object> RESULT_UPDATER =
41              AtomicReferenceFieldUpdater.newUpdater(DefaultPromise.class, Object.class, "result");
42      private static final Object SUCCESS = new Object();
43      private static final Object UNCANCELLABLE = new Object();
44      private static final CauseHolder CANCELLATION_CAUSE_HOLDER = new CauseHolder(ThrowableUtil.unknownStackTrace(
45              new CancellationException(), DefaultPromise.class, "cancel(...)"));
46  
47      private volatile Object result;
48      private final EventExecutor executor;
49      /**
50       * One or more listeners. Can be a {@link GenericFutureListener} or a {@link DefaultFutureListeners}.
51       * If {@code null}, it means either 1) no listeners were added yet or 2) all listeners were notified.
52       *
53       * Threading - synchronized(this). We must support adding listeners when there is no EventExecutor.
54       */
55      private Object listeners;
56      /**
57       * Threading - synchronized(this). We are required to hold the monitor to use Java's underlying wait()/notifyAll().
58       */
59      private short waiters;
60  
61      /**
62       * Threading - synchronized(this). We must prevent concurrent notification and FIFO listener notification if the
63       * executor changes.
64       */
65      private boolean notifyingListeners;
66  
67      /**
68       * Creates a new instance.
69       *
70       * It is preferable to use {@link EventExecutor#newPromise()} to create a new promise
71       *
72       * @param executor
73       *        the {@link EventExecutor} which is used to notify the promise once it is complete.
74       *        It is assumed this executor will protect against {@link StackOverflowError} exceptions.
75       *        The executor may be used to avoid {@link StackOverflowError} by executing a {@link Runnable} if the stack
76       *        depth exceeds a threshold.
77       *
78       */
79      public DefaultPromise(EventExecutor executor) {
80          this.executor = checkNotNull(executor, "executor");
81      }
82  
83      /**
84       * See {@link #executor()} for expectations of the executor.
85       */
86      protected DefaultPromise() {
87          // only for subclasses
88          executor = null;
89      }
90  
91      @Override
92      public Promise<V> setSuccess(V result) {
93          if (setSuccess0(result)) {
94              notifyListeners();
95              return this;
96          }
97          throw new IllegalStateException("complete already: " + this);
98      }
99  
100     @Override
101     public boolean trySuccess(V result) {
102         if (setSuccess0(result)) {
103             notifyListeners();
104             return true;
105         }
106         return false;
107     }
108 
109     @Override
110     public Promise<V> setFailure(Throwable cause) {
111         if (setFailure0(cause)) {
112             notifyListeners();
113             return this;
114         }
115         throw new IllegalStateException("complete already: " + this, cause);
116     }
117 
118     @Override
119     public boolean tryFailure(Throwable cause) {
120         if (setFailure0(cause)) {
121             notifyListeners();
122             return true;
123         }
124         return false;
125     }
126 
127     @Override
128     public boolean setUncancellable() {
129         if (RESULT_UPDATER.compareAndSet(this, null, UNCANCELLABLE)) {
130             return true;
131         }
132         Object result = this.result;
133         return !isDone0(result) || !isCancelled0(result);
134     }
135 
136     @Override
137     public boolean isSuccess() {
138         Object result = this.result;
139         return result != null && result != UNCANCELLABLE && !(result instanceof CauseHolder);
140     }
141 
142     @Override
143     public boolean isCancellable() {
144         return result == null;
145     }
146 
147     @Override
148     public Throwable cause() {
149         Object result = this.result;
150         return (result instanceof CauseHolder) ? ((CauseHolder) result).cause : null;
151     }
152 
153     @Override
154     public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
155         checkNotNull(listener, "listener");
156 
157         synchronized (this) {
158             addListener0(listener);
159         }
160 
161         if (isDone()) {
162             notifyListeners();
163         }
164 
165         return this;
166     }
167 
168     @Override
169     public Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners) {
170         checkNotNull(listeners, "listeners");
171 
172         synchronized (this) {
173             for (GenericFutureListener<? extends Future<? super V>> listener : listeners) {
174                 if (listener == null) {
175                     break;
176                 }
177                 addListener0(listener);
178             }
179         }
180 
181         if (isDone()) {
182             notifyListeners();
183         }
184 
185         return this;
186     }
187 
188     @Override
189     public Promise<V> removeListener(final GenericFutureListener<? extends Future<? super V>> listener) {
190         checkNotNull(listener, "listener");
191 
192         synchronized (this) {
193             removeListener0(listener);
194         }
195 
196         return this;
197     }
198 
199     @Override
200     public Promise<V> removeListeners(final GenericFutureListener<? extends Future<? super V>>... listeners) {
201         checkNotNull(listeners, "listeners");
202 
203         synchronized (this) {
204             for (GenericFutureListener<? extends Future<? super V>> listener : listeners) {
205                 if (listener == null) {
206                     break;
207                 }
208                 removeListener0(listener);
209             }
210         }
211 
212         return this;
213     }
214 
215     @Override
216     public Promise<V> await() throws InterruptedException {
217         if (isDone()) {
218             return this;
219         }
220 
221         if (Thread.interrupted()) {
222             throw new InterruptedException(toString());
223         }
224 
225         checkDeadLock();
226 
227         synchronized (this) {
228             while (!isDone()) {
229                 incWaiters();
230                 try {
231                     wait();
232                 } finally {
233                     decWaiters();
234                 }
235             }
236         }
237         return this;
238     }
239 
240     @Override
241     public Promise<V> awaitUninterruptibly() {
242         if (isDone()) {
243             return this;
244         }
245 
246         checkDeadLock();
247 
248         boolean interrupted = false;
249         synchronized (this) {
250             while (!isDone()) {
251                 incWaiters();
252                 try {
253                     wait();
254                 } catch (InterruptedException e) {
255                     // Interrupted while waiting.
256                     interrupted = true;
257                 } finally {
258                     decWaiters();
259                 }
260             }
261         }
262 
263         if (interrupted) {
264             Thread.currentThread().interrupt();
265         }
266 
267         return this;
268     }
269 
270     @Override
271     public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
272         return await0(unit.toNanos(timeout), true);
273     }
274 
275     @Override
276     public boolean await(long timeoutMillis) throws InterruptedException {
277         return await0(MILLISECONDS.toNanos(timeoutMillis), true);
278     }
279 
280     @Override
281     public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {
282         try {
283             return await0(unit.toNanos(timeout), false);
284         } catch (InterruptedException e) {
285             // Should not be raised at all.
286             throw new InternalError();
287         }
288     }
289 
290     @Override
291     public boolean awaitUninterruptibly(long timeoutMillis) {
292         try {
293             return await0(MILLISECONDS.toNanos(timeoutMillis), false);
294         } catch (InterruptedException e) {
295             // Should not be raised at all.
296             throw new InternalError();
297         }
298     }
299 
300     @SuppressWarnings("unchecked")
301     @Override
302     public V getNow() {
303         Object result = this.result;
304         if (result instanceof CauseHolder || result == SUCCESS || result == UNCANCELLABLE) {
305             return null;
306         }
307         return (V) result;
308     }
309 
310     /**
311      * {@inheritDoc}
312      *
313      * @param mayInterruptIfRunning this value has no effect in this implementation.
314      */
315     @Override
316     public boolean cancel(boolean mayInterruptIfRunning) {
317         if (RESULT_UPDATER.compareAndSet(this, null, CANCELLATION_CAUSE_HOLDER)) {
318             checkNotifyWaiters();
319             notifyListeners();
320             return true;
321         }
322         return false;
323     }
324 
325     @Override
326     public boolean isCancelled() {
327         return isCancelled0(result);
328     }
329 
330     @Override
331     public boolean isDone() {
332         return isDone0(result);
333     }
334 
335     @Override
336     public Promise<V> sync() throws InterruptedException {
337         await();
338         rethrowIfFailed();
339         return this;
340     }
341 
342     @Override
343     public Promise<V> syncUninterruptibly() {
344         awaitUninterruptibly();
345         rethrowIfFailed();
346         return this;
347     }
348 
349     @Override
350     public String toString() {
351         return toStringBuilder().toString();
352     }
353 
354     protected StringBuilder toStringBuilder() {
355         StringBuilder buf = new StringBuilder(64)
356                 .append(StringUtil.simpleClassName(this))
357                 .append('@')
358                 .append(Integer.toHexString(hashCode()));
359 
360         Object result = this.result;
361         if (result == SUCCESS) {
362             buf.append("(success)");
363         } else if (result == UNCANCELLABLE) {
364             buf.append("(uncancellable)");
365         } else if (result instanceof CauseHolder) {
366             buf.append("(failure: ")
367                     .append(((CauseHolder) result).cause)
368                     .append(')');
369         } else if (result != null) {
370             buf.append("(success: ")
371                     .append(result)
372                     .append(')');
373         } else {
374             buf.append("(incomplete)");
375         }
376 
377         return buf;
378     }
379 
380     /**
381      * Get the executor used to notify listeners when this promise is complete.
382      * <p>
383      * It is assumed this executor will protect against {@link StackOverflowError} exceptions.
384      * The executor may be used to avoid {@link StackOverflowError} by executing a {@link Runnable} if the stack
385      * depth exceeds a threshold.
386      * @return The executor used to notify listeners when this promise is complete.
387      */
388     protected EventExecutor executor() {
389         return executor;
390     }
391 
392     protected void checkDeadLock() {
393         EventExecutor e = executor();
394         if (e != null && e.inEventLoop()) {
395             throw new BlockingOperationException(toString());
396         }
397     }
398 
399     /**
400      * Notify a listener that a future has completed.
401      * <p>
402      * This method has a fixed depth of {@link #MAX_LISTENER_STACK_DEPTH} that will limit recursion to prevent
403      * {@link StackOverflowError} and will stop notifying listeners added after this threshold is exceeded.
404      * @param eventExecutor the executor to use to notify the listener {@code listener}.
405      * @param future the future that is complete.
406      * @param listener the listener to notify.
407      */
408     protected static void notifyListener(
409             EventExecutor eventExecutor, final Future<?> future, final GenericFutureListener<?> listener) {
410         checkNotNull(eventExecutor, "eventExecutor");
411         checkNotNull(future, "future");
412         checkNotNull(listener, "listener");
413         notifyListenerWithStackOverFlowProtection(eventExecutor, future, listener);
414     }
415 
416     private void notifyListeners() {
417         EventExecutor executor = executor();
418         if (executor.inEventLoop()) {
419             final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
420             final int stackDepth = threadLocals.futureListenerStackDepth();
421             if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
422                 threadLocals.setFutureListenerStackDepth(stackDepth + 1);
423                 try {
424                     notifyListenersNow();
425                 } finally {
426                     threadLocals.setFutureListenerStackDepth(stackDepth);
427                 }
428                 return;
429             }
430         }
431 
432         safeExecute(executor, new Runnable() {
433             @Override
434             public void run() {
435                 notifyListenersNow();
436             }
437         });
438     }
439 
440     /**
441      * The logic in this method should be identical to {@link #notifyListeners()} but
442      * cannot share code because the listener(s) cannot be cached for an instance of {@link DefaultPromise} since the
443      * listener(s) may be changed and is protected by a synchronized operation.
444      */
445     private static void notifyListenerWithStackOverFlowProtection(final EventExecutor executor,
446                                                                   final Future<?> future,
447                                                                   final GenericFutureListener<?> listener) {
448         if (executor.inEventLoop()) {
449             final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
450             final int stackDepth = threadLocals.futureListenerStackDepth();
451             if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
452                 threadLocals.setFutureListenerStackDepth(stackDepth + 1);
453                 try {
454                     notifyListener0(future, listener);
455                 } finally {
456                     threadLocals.setFutureListenerStackDepth(stackDepth);
457                 }
458                 return;
459             }
460         }
461 
462         safeExecute(executor, new Runnable() {
463             @Override
464             public void run() {
465                 notifyListener0(future, listener);
466             }
467         });
468     }
469 
470     private void notifyListenersNow() {
471         Object listeners;
472         synchronized (this) {
473             // Only proceed if there are listeners to notify and we are not already notifying listeners.
474             if (notifyingListeners || this.listeners == null) {
475                 return;
476             }
477             notifyingListeners = true;
478             listeners = this.listeners;
479             this.listeners = null;
480         }
481         for (;;) {
482             if (listeners instanceof DefaultFutureListeners) {
483                 notifyListeners0((DefaultFutureListeners) listeners);
484             } else {
485                 notifyListener0(this, (GenericFutureListener<?>) listeners);
486             }
487             synchronized (this) {
488                 if (this.listeners == null) {
489                     // Nothing can throw from within this method, so setting notifyingListeners back to false does not
490                     // need to be in a finally block.
491                     notifyingListeners = false;
492                     return;
493                 }
494                 listeners = this.listeners;
495                 this.listeners = null;
496             }
497         }
498     }
499 
500     private void notifyListeners0(DefaultFutureListeners listeners) {
501         GenericFutureListener<?>[] a = listeners.listeners();
502         int size = listeners.size();
503         for (int i = 0; i < size; i ++) {
504             notifyListener0(this, a[i]);
505         }
506     }
507 
508     @SuppressWarnings({ "unchecked", "rawtypes" })
509     private static void notifyListener0(Future future, GenericFutureListener l) {
510         try {
511             l.operationComplete(future);
512         } catch (Throwable t) {
513             if (logger.isWarnEnabled()) {
514                 logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t);
515             }
516         }
517     }
518 
519     private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) {
520         if (listeners == null) {
521             listeners = listener;
522         } else if (listeners instanceof DefaultFutureListeners) {
523             ((DefaultFutureListeners) listeners).add(listener);
524         } else {
525             listeners = new DefaultFutureListeners((GenericFutureListener<?>) listeners, listener);
526         }
527     }
528 
529     private void removeListener0(GenericFutureListener<? extends Future<? super V>> listener) {
530         if (listeners instanceof DefaultFutureListeners) {
531             ((DefaultFutureListeners) listeners).remove(listener);
532         } else if (listeners == listener) {
533             listeners = null;
534         }
535     }
536 
537     private boolean setSuccess0(V result) {
538         return setValue0(result == null ? SUCCESS : result);
539     }
540 
541     private boolean setFailure0(Throwable cause) {
542         return setValue0(new CauseHolder(checkNotNull(cause, "cause")));
543     }
544 
545     private boolean setValue0(Object objResult) {
546         if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
547             RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
548             checkNotifyWaiters();
549             return true;
550         }
551         return false;
552     }
553 
554     private synchronized void checkNotifyWaiters() {
555         if (waiters > 0) {
556             notifyAll();
557         }
558     }
559 
560     private void incWaiters() {
561         if (waiters == Short.MAX_VALUE) {
562             throw new IllegalStateException("too many waiters: " + this);
563         }
564         ++waiters;
565     }
566 
567     private void decWaiters() {
568         --waiters;
569     }
570 
571     private void rethrowIfFailed() {
572         Throwable cause = cause();
573         if (cause == null) {
574             return;
575         }
576 
577         PlatformDependent.throwException(cause);
578     }
579 
580     private boolean await0(long timeoutNanos, boolean interruptable) throws InterruptedException {
581         if (isDone()) {
582             return true;
583         }
584 
585         if (timeoutNanos <= 0) {
586             return isDone();
587         }
588 
589         if (interruptable && Thread.interrupted()) {
590             throw new InterruptedException(toString());
591         }
592 
593         checkDeadLock();
594 
595         long startTime = System.nanoTime();
596         long waitTime = timeoutNanos;
597         boolean interrupted = false;
598         try {
599             for (;;) {
600                 synchronized (this) {
601                     if (isDone()) {
602                         return true;
603                     }
604                     incWaiters();
605                     try {
606                         wait(waitTime / 1000000, (int) (waitTime % 1000000));
607                     } catch (InterruptedException e) {
608                         if (interruptable) {
609                             throw e;
610                         } else {
611                             interrupted = true;
612                         }
613                     } finally {
614                         decWaiters();
615                     }
616                 }
617                 if (isDone()) {
618                     return true;
619                 } else {
620                     waitTime = timeoutNanos - (System.nanoTime() - startTime);
621                     if (waitTime <= 0) {
622                         return isDone();
623                     }
624                 }
625             }
626         } finally {
627             if (interrupted) {
628                 Thread.currentThread().interrupt();
629             }
630         }
631     }
632 
633     /**
634      * Notify all progressive listeners.
635      * <p>
636      * No attempt is made to ensure notification order if multiple calls are made to this method before
637      * the original invocation completes.
638      * <p>
639      * This will do an iteration over all listeners to get all of type {@link GenericProgressiveFutureListener}s.
640      * @param progress the new progress.
641      * @param total the total progress.
642      */
643     @SuppressWarnings("unchecked")
644     void notifyProgressiveListeners(final long progress, final long total) {
645         final Object listeners = progressiveListeners();
646         if (listeners == null) {
647             return;
648         }
649 
650         final ProgressiveFuture<V> self = (ProgressiveFuture<V>) this;
651 
652         EventExecutor executor = executor();
653         if (executor.inEventLoop()) {
654             if (listeners instanceof GenericProgressiveFutureListener[]) {
655                 notifyProgressiveListeners0(
656                         self, (GenericProgressiveFutureListener<?>[]) listeners, progress, total);
657             } else {
658                 notifyProgressiveListener0(
659                         self, (GenericProgressiveFutureListener<ProgressiveFuture<V>>) listeners, progress, total);
660             }
661         } else {
662             if (listeners instanceof GenericProgressiveFutureListener[]) {
663                 final GenericProgressiveFutureListener<?>[] array =
664                         (GenericProgressiveFutureListener<?>[]) listeners;
665                 safeExecute(executor, new Runnable() {
666                     @Override
667                     public void run() {
668                         notifyProgressiveListeners0(self, array, progress, total);
669                     }
670                 });
671             } else {
672                 final GenericProgressiveFutureListener<ProgressiveFuture<V>> l =
673                         (GenericProgressiveFutureListener<ProgressiveFuture<V>>) listeners;
674                 safeExecute(executor, new Runnable() {
675                     @Override
676                     public void run() {
677                         notifyProgressiveListener0(self, l, progress, total);
678                     }
679                 });
680             }
681         }
682     }
683 
684     /**
685      * Returns a {@link GenericProgressiveFutureListener}, an array of {@link GenericProgressiveFutureListener}, or
686      * {@code null}.
687      */
688     private synchronized Object progressiveListeners() {
689         Object listeners = this.listeners;
690         if (listeners == null) {
691             // No listeners added
692             return null;
693         }
694 
695         if (listeners instanceof DefaultFutureListeners) {
696             // Copy DefaultFutureListeners into an array of listeners.
697             DefaultFutureListeners dfl = (DefaultFutureListeners) listeners;
698             int progressiveSize = dfl.progressiveSize();
699             switch (progressiveSize) {
700                 case 0:
701                     return null;
702                 case 1:
703                     for (GenericFutureListener<?> l: dfl.listeners()) {
704                         if (l instanceof GenericProgressiveFutureListener) {
705                             return l;
706                         }
707                     }
708                     return null;
709             }
710 
711             GenericFutureListener<?>[] array = dfl.listeners();
712             GenericProgressiveFutureListener<?>[] copy = new GenericProgressiveFutureListener[progressiveSize];
713             for (int i = 0, j = 0; j < progressiveSize; i ++) {
714                 GenericFutureListener<?> l = array[i];
715                 if (l instanceof GenericProgressiveFutureListener) {
716                     copy[j ++] = (GenericProgressiveFutureListener<?>) l;
717                 }
718             }
719 
720             return copy;
721         } else if (listeners instanceof GenericProgressiveFutureListener) {
722             return listeners;
723         } else {
724             // Only one listener was added and it's not a progressive listener.
725             return null;
726         }
727     }
728 
729     private static void notifyProgressiveListeners0(
730             ProgressiveFuture<?> future, GenericProgressiveFutureListener<?>[] listeners, long progress, long total) {
731         for (GenericProgressiveFutureListener<?> l: listeners) {
732             if (l == null) {
733                 break;
734             }
735             notifyProgressiveListener0(future, l, progress, total);
736         }
737     }
738 
739     @SuppressWarnings({ "unchecked", "rawtypes" })
740     private static void notifyProgressiveListener0(
741             ProgressiveFuture future, GenericProgressiveFutureListener l, long progress, long total) {
742         try {
743             l.operationProgressed(future, progress, total);
744         } catch (Throwable t) {
745             if (logger.isWarnEnabled()) {
746                 logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationProgressed()", t);
747             }
748         }
749     }
750 
751     private static boolean isCancelled0(Object result) {
752         return result instanceof CauseHolder && ((CauseHolder) result).cause instanceof CancellationException;
753     }
754 
755     private static boolean isDone0(Object result) {
756         return result != null && result != UNCANCELLABLE;
757     }
758 
759     private static final class CauseHolder {
760         final Throwable cause;
761         CauseHolder(Throwable cause) {
762             this.cause = cause;
763         }
764     }
765 
766     private static void safeExecute(EventExecutor executor, Runnable task) {
767         try {
768             executor.execute(task);
769         } catch (Throwable t) {
770             rejectedExecutionLogger.error("Failed to submit a listener notification task. Event loop shut down?", t);
771         }
772     }
773 }