View Javadoc
1   /*
2    * Copyright 2013 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.util.concurrent;
17  
18  import io.netty.util.Signal;
19  import io.netty.util.internal.InternalThreadLocalMap;
20  import io.netty.util.internal.PlatformDependent;
21  import io.netty.util.internal.StringUtil;
22  import io.netty.util.internal.SystemPropertyUtil;
23  import io.netty.util.internal.ThrowableUtil;
24  import io.netty.util.internal.logging.InternalLogger;
25  import io.netty.util.internal.logging.InternalLoggerFactory;
26  
27  import java.util.concurrent.CancellationException;
28  import java.util.concurrent.TimeUnit;
29  import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
30  
31  import static io.netty.util.internal.ObjectUtil.checkNotNull;
32  import static java.util.concurrent.TimeUnit.MILLISECONDS;
33  
34  public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
35      private static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultPromise.class);
36      private static final InternalLogger rejectedExecutionLogger =
37              InternalLoggerFactory.getInstance(DefaultPromise.class.getName() + ".rejectedExecution");
38      private static final int MAX_LISTENER_STACK_DEPTH = Math.min(8,
39              SystemPropertyUtil.getInt("io.netty.defaultPromise.maxListenerStackDepth", 8));
40      @SuppressWarnings("rawtypes")
41      private static final AtomicReferenceFieldUpdater<DefaultPromise, Object> RESULT_UPDATER =
42              AtomicReferenceFieldUpdater.newUpdater(DefaultPromise.class, Object.class, "result");
43      private static final Signal SUCCESS = Signal.valueOf(DefaultPromise.class, "SUCCESS");
44      private static final Signal UNCANCELLABLE = Signal.valueOf(DefaultPromise.class, "UNCANCELLABLE");
45      private static final CauseHolder CANCELLATION_CAUSE_HOLDER = new CauseHolder(ThrowableUtil.unknownStackTrace(
46              new CancellationException(), DefaultPromise.class, "cancel(...)"));
47  
48      private volatile Object result;
49      private final EventExecutor executor;
50      /**
51       * One or more listeners. Can be a {@link GenericFutureListener} or a {@link DefaultFutureListeners}.
52       * If {@code null}, it means either 1) no listeners were added yet or 2) all listeners were notified.
53       *
54       * Threading - synchronized(this). We must support adding listeners when there is no EventExecutor.
55       */
56      private Object listeners;
57      /**
58       * Threading - synchronized(this). We are required to hold the monitor to use Java's underlying wait()/notifyAll().
59       */
60      private short waiters;
61  
62      /**
63       * Threading - synchronized(this). We must prevent concurrent notification and FIFO listener notification if the
64       * executor changes.
65       */
66      private boolean notifyingListeners;
67  
68      /**
69       * Creates a new instance.
70       *
71       * It is preferable to use {@link EventExecutor#newPromise()} to create a new promise
72       *
73       * @param executor
74       *        the {@link EventExecutor} which is used to notify the promise once it is complete.
75       *        It is assumed this executor will protect against {@link StackOverflowError} exceptions.
76       *        The executor may be used to avoid {@link StackOverflowError} by executing a {@link Runnable} if the stack
77       *        depth exceeds a threshold.
78       *
79       */
80      public DefaultPromise(EventExecutor executor) {
81          this.executor = checkNotNull(executor, "executor");
82      }
83  
84      /**
85       * See {@link #executor()} for expectations of the executor.
86       */
87      protected DefaultPromise() {
88          // only for subclasses
89          executor = null;
90      }
91  
92      @Override
93      public Promise<V> setSuccess(V result) {
94          if (setSuccess0(result)) {
95              notifyListeners();
96              return this;
97          }
98          throw new IllegalStateException("complete already: " + this);
99      }
100 
101     @Override
102     public boolean trySuccess(V result) {
103         if (setSuccess0(result)) {
104             notifyListeners();
105             return true;
106         }
107         return false;
108     }
109 
110     @Override
111     public Promise<V> setFailure(Throwable cause) {
112         if (setFailure0(cause)) {
113             notifyListeners();
114             return this;
115         }
116         throw new IllegalStateException("complete already: " + this, cause);
117     }
118 
119     @Override
120     public boolean tryFailure(Throwable cause) {
121         if (setFailure0(cause)) {
122             notifyListeners();
123             return true;
124         }
125         return false;
126     }
127 
128     @Override
129     public boolean setUncancellable() {
130         if (RESULT_UPDATER.compareAndSet(this, null, UNCANCELLABLE)) {
131             return true;
132         }
133         Object result = this.result;
134         return !isDone0(result) || !isCancelled0(result);
135     }
136 
137     @Override
138     public boolean isSuccess() {
139         Object result = this.result;
140         return result != null && result != UNCANCELLABLE && !(result instanceof CauseHolder);
141     }
142 
143     @Override
144     public boolean isCancellable() {
145         return result == null;
146     }
147 
148     @Override
149     public Throwable cause() {
150         Object result = this.result;
151         return (result instanceof CauseHolder) ? ((CauseHolder) result).cause : null;
152     }
153 
154     @Override
155     public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
156         checkNotNull(listener, "listener");
157 
158         synchronized (this) {
159             addListener0(listener);
160         }
161 
162         if (isDone()) {
163             notifyListeners();
164         }
165 
166         return this;
167     }
168 
169     @Override
170     public Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners) {
171         checkNotNull(listeners, "listeners");
172 
173         synchronized (this) {
174             for (GenericFutureListener<? extends Future<? super V>> listener : listeners) {
175                 if (listener == null) {
176                     break;
177                 }
178                 addListener0(listener);
179             }
180         }
181 
182         if (isDone()) {
183             notifyListeners();
184         }
185 
186         return this;
187     }
188 
189     @Override
190     public Promise<V> removeListener(final GenericFutureListener<? extends Future<? super V>> listener) {
191         checkNotNull(listener, "listener");
192 
193         synchronized (this) {
194             removeListener0(listener);
195         }
196 
197         return this;
198     }
199 
200     @Override
201     public Promise<V> removeListeners(final GenericFutureListener<? extends Future<? super V>>... listeners) {
202         checkNotNull(listeners, "listeners");
203 
204         synchronized (this) {
205             for (GenericFutureListener<? extends Future<? super V>> listener : listeners) {
206                 if (listener == null) {
207                     break;
208                 }
209                 removeListener0(listener);
210             }
211         }
212 
213         return this;
214     }
215 
216     @Override
217     public Promise<V> await() throws InterruptedException {
218         if (isDone()) {
219             return this;
220         }
221 
222         if (Thread.interrupted()) {
223             throw new InterruptedException(toString());
224         }
225 
226         checkDeadLock();
227 
228         synchronized (this) {
229             while (!isDone()) {
230                 incWaiters();
231                 try {
232                     wait();
233                 } finally {
234                     decWaiters();
235                 }
236             }
237         }
238         return this;
239     }
240 
241     @Override
242     public Promise<V> awaitUninterruptibly() {
243         if (isDone()) {
244             return this;
245         }
246 
247         checkDeadLock();
248 
249         boolean interrupted = false;
250         synchronized (this) {
251             while (!isDone()) {
252                 incWaiters();
253                 try {
254                     wait();
255                 } catch (InterruptedException e) {
256                     // Interrupted while waiting.
257                     interrupted = true;
258                 } finally {
259                     decWaiters();
260                 }
261             }
262         }
263 
264         if (interrupted) {
265             Thread.currentThread().interrupt();
266         }
267 
268         return this;
269     }
270 
271     @Override
272     public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
273         return await0(unit.toNanos(timeout), true);
274     }
275 
276     @Override
277     public boolean await(long timeoutMillis) throws InterruptedException {
278         return await0(MILLISECONDS.toNanos(timeoutMillis), true);
279     }
280 
281     @Override
282     public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {
283         try {
284             return await0(unit.toNanos(timeout), false);
285         } catch (InterruptedException e) {
286             // Should not be raised at all.
287             throw new InternalError();
288         }
289     }
290 
291     @Override
292     public boolean awaitUninterruptibly(long timeoutMillis) {
293         try {
294             return await0(MILLISECONDS.toNanos(timeoutMillis), false);
295         } catch (InterruptedException e) {
296             // Should not be raised at all.
297             throw new InternalError();
298         }
299     }
300 
301     @SuppressWarnings("unchecked")
302     @Override
303     public V getNow() {
304         Object result = this.result;
305         if (result instanceof CauseHolder || result == SUCCESS) {
306             return null;
307         }
308         return (V) result;
309     }
310 
311     @Override
312     public boolean cancel(boolean mayInterruptIfRunning) {
313         if (RESULT_UPDATER.compareAndSet(this, null, CANCELLATION_CAUSE_HOLDER)) {
314             checkNotifyWaiters();
315             notifyListeners();
316             return true;
317         }
318         return false;
319     }
320 
321     @Override
322     public boolean isCancelled() {
323         return isCancelled0(result);
324     }
325 
326     @Override
327     public boolean isDone() {
328         return isDone0(result);
329     }
330 
331     @Override
332     public Promise<V> sync() throws InterruptedException {
333         await();
334         rethrowIfFailed();
335         return this;
336     }
337 
338     @Override
339     public Promise<V> syncUninterruptibly() {
340         awaitUninterruptibly();
341         rethrowIfFailed();
342         return this;
343     }
344 
345     @Override
346     public String toString() {
347         return toStringBuilder().toString();
348     }
349 
350     protected StringBuilder toStringBuilder() {
351         StringBuilder buf = new StringBuilder(64)
352                 .append(StringUtil.simpleClassName(this))
353                 .append('@')
354                 .append(Integer.toHexString(hashCode()));
355 
356         Object result = this.result;
357         if (result == SUCCESS) {
358             buf.append("(success)");
359         } else if (result == UNCANCELLABLE) {
360             buf.append("(uncancellable)");
361         } else if (result instanceof CauseHolder) {
362             buf.append("(failure: ")
363                     .append(((CauseHolder) result).cause)
364                     .append(')');
365         } else if (result != null) {
366             buf.append("(success: ")
367                     .append(result)
368                     .append(')');
369         } else {
370             buf.append("(incomplete)");
371         }
372 
373         return buf;
374     }
375 
376     /**
377      * Get the executor used to notify listeners when this promise is complete.
378      * <p>
379      * It is assumed this executor will protect against {@link StackOverflowError} exceptions.
380      * The executor may be used to avoid {@link StackOverflowError} by executing a {@link Runnable} if the stack
381      * depth exceeds a threshold.
382      * @return The executor used to notify listeners when this promise is complete.
383      */
384     protected EventExecutor executor() {
385         return executor;
386     }
387 
388     protected void checkDeadLock() {
389         EventExecutor e = executor();
390         if (e != null && e.inEventLoop()) {
391             throw new BlockingOperationException(toString());
392         }
393     }
394 
395     /**
396      * Notify a listener that a future has completed.
397      * <p>
398      * This method has a fixed depth of {@link #MAX_LISTENER_STACK_DEPTH} that will limit recursion to prevent
399      * {@link StackOverflowError} and will stop notifying listeners added after this threshold is exceeded.
400      * @param eventExecutor the executor to use to notify the listener {@code listener}.
401      * @param future the future that is complete.
402      * @param listener the listener to notify.
403      */
404     protected static void notifyListener(
405             EventExecutor eventExecutor, final Future<?> future, final GenericFutureListener<?> listener) {
406         checkNotNull(eventExecutor, "eventExecutor");
407         checkNotNull(future, "future");
408         checkNotNull(listener, "listener");
409         notifyListenerWithStackOverFlowProtection(eventExecutor, future, listener);
410     }
411 
412     private void notifyListeners() {
413         EventExecutor executor = executor();
414         if (executor.inEventLoop()) {
415             final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
416             final int stackDepth = threadLocals.futureListenerStackDepth();
417             if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
418                 threadLocals.setFutureListenerStackDepth(stackDepth + 1);
419                 try {
420                     notifyListenersNow();
421                 } finally {
422                     threadLocals.setFutureListenerStackDepth(stackDepth);
423                 }
424                 return;
425             }
426         }
427 
428         safeExecute(executor, new Runnable() {
429             @Override
430             public void run() {
431                 notifyListenersNow();
432             }
433         });
434     }
435 
436     /**
437      * The logic in this method should be identical to {@link #notifyListeners()} but
438      * cannot share code because the listener(s) cannot be cached for an instance of {@link DefaultPromise} since the
439      * listener(s) may be changed and is protected by a synchronized operation.
440      */
441     private static void notifyListenerWithStackOverFlowProtection(final EventExecutor executor,
442                                                                   final Future<?> future,
443                                                                   final GenericFutureListener<?> listener) {
444         if (executor.inEventLoop()) {
445             final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
446             final int stackDepth = threadLocals.futureListenerStackDepth();
447             if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
448                 threadLocals.setFutureListenerStackDepth(stackDepth + 1);
449                 try {
450                     notifyListener0(future, listener);
451                 } finally {
452                     threadLocals.setFutureListenerStackDepth(stackDepth);
453                 }
454                 return;
455             }
456         }
457 
458         safeExecute(executor, new Runnable() {
459             @Override
460             public void run() {
461                 notifyListener0(future, listener);
462             }
463         });
464     }
465 
466     private void notifyListenersNow() {
467         Object listeners;
468         synchronized (this) {
469             // Only proceed if there are listeners to notify and we are not already notifying listeners.
470             if (notifyingListeners || this.listeners == null) {
471                 return;
472             }
473             notifyingListeners = true;
474             listeners = this.listeners;
475             this.listeners = null;
476         }
477         for (;;) {
478             if (listeners instanceof DefaultFutureListeners) {
479                 notifyListeners0((DefaultFutureListeners) listeners);
480             } else {
481                 notifyListener0(this, (GenericFutureListener<?>) listeners);
482             }
483             synchronized (this) {
484                 if (this.listeners == null) {
485                     // Nothing can throw from within this method, so setting notifyingListeners back to false does not
486                     // need to be in a finally block.
487                     notifyingListeners = false;
488                     return;
489                 }
490                 listeners = this.listeners;
491                 this.listeners = null;
492             }
493         }
494     }
495 
496     private void notifyListeners0(DefaultFutureListeners listeners) {
497         GenericFutureListener<?>[] a = listeners.listeners();
498         int size = listeners.size();
499         for (int i = 0; i < size; i ++) {
500             notifyListener0(this, a[i]);
501         }
502     }
503 
504     @SuppressWarnings({ "unchecked", "rawtypes" })
505     private static void notifyListener0(Future future, GenericFutureListener l) {
506         try {
507             l.operationComplete(future);
508         } catch (Throwable t) {
509             logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t);
510         }
511     }
512 
513     private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) {
514         if (listeners == null) {
515             listeners = listener;
516         } else if (listeners instanceof DefaultFutureListeners) {
517             ((DefaultFutureListeners) listeners).add(listener);
518         } else {
519             listeners = new DefaultFutureListeners((GenericFutureListener<?>) listeners, listener);
520         }
521     }
522 
523     private void removeListener0(GenericFutureListener<? extends Future<? super V>> listener) {
524         if (listeners instanceof DefaultFutureListeners) {
525             ((DefaultFutureListeners) listeners).remove(listener);
526         } else if (listeners == listener) {
527             listeners = null;
528         }
529     }
530 
531     private boolean setSuccess0(V result) {
532         return setValue0(result == null ? SUCCESS : result);
533     }
534 
535     private boolean setFailure0(Throwable cause) {
536         return setValue0(new CauseHolder(checkNotNull(cause, "cause")));
537     }
538 
539     private boolean setValue0(Object objResult) {
540         if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
541             RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
542             checkNotifyWaiters();
543             return true;
544         }
545         return false;
546     }
547 
548     private synchronized void checkNotifyWaiters() {
549         if (waiters > 0) {
550             notifyAll();
551         }
552     }
553 
554     private void incWaiters() {
555         if (waiters == Short.MAX_VALUE) {
556             throw new IllegalStateException("too many waiters: " + this);
557         }
558         ++waiters;
559     }
560 
561     private void decWaiters() {
562         --waiters;
563     }
564 
565     private void rethrowIfFailed() {
566         Throwable cause = cause();
567         if (cause == null) {
568             return;
569         }
570 
571         PlatformDependent.throwException(cause);
572     }
573 
574     private boolean await0(long timeoutNanos, boolean interruptable) throws InterruptedException {
575         if (isDone()) {
576             return true;
577         }
578 
579         if (timeoutNanos <= 0) {
580             return isDone();
581         }
582 
583         if (interruptable && Thread.interrupted()) {
584             throw new InterruptedException(toString());
585         }
586 
587         checkDeadLock();
588 
589         long startTime = System.nanoTime();
590         long waitTime = timeoutNanos;
591         boolean interrupted = false;
592         try {
593             for (;;) {
594                 synchronized (this) {
595                     if (isDone()) {
596                         return true;
597                     }
598                     incWaiters();
599                     try {
600                         wait(waitTime / 1000000, (int) (waitTime % 1000000));
601                     } catch (InterruptedException e) {
602                         if (interruptable) {
603                             throw e;
604                         } else {
605                             interrupted = true;
606                         }
607                     } finally {
608                         decWaiters();
609                     }
610                 }
611                 if (isDone()) {
612                     return true;
613                 } else {
614                     waitTime = timeoutNanos - (System.nanoTime() - startTime);
615                     if (waitTime <= 0) {
616                         return isDone();
617                     }
618                 }
619             }
620         } finally {
621             if (interrupted) {
622                 Thread.currentThread().interrupt();
623             }
624         }
625     }
626 
627     /**
628      * Notify all progressive listeners.
629      * <p>
630      * No attempt is made to ensure notification order if multiple calls are made to this method before
631      * the original invocation completes.
632      * <p>
633      * This will do an iteration over all listeners to get all of type {@link GenericProgressiveFutureListener}s.
634      * @param progress the new progress.
635      * @param total the total progress.
636      */
637     @SuppressWarnings("unchecked")
638     void notifyProgressiveListeners(final long progress, final long total) {
639         final Object listeners = progressiveListeners();
640         if (listeners == null) {
641             return;
642         }
643 
644         final ProgressiveFuture<V> self = (ProgressiveFuture<V>) this;
645 
646         EventExecutor executor = executor();
647         if (executor.inEventLoop()) {
648             if (listeners instanceof GenericProgressiveFutureListener[]) {
649                 notifyProgressiveListeners0(
650                         self, (GenericProgressiveFutureListener<?>[]) listeners, progress, total);
651             } else {
652                 notifyProgressiveListener0(
653                         self, (GenericProgressiveFutureListener<ProgressiveFuture<V>>) listeners, progress, total);
654             }
655         } else {
656             if (listeners instanceof GenericProgressiveFutureListener[]) {
657                 final GenericProgressiveFutureListener<?>[] array =
658                         (GenericProgressiveFutureListener<?>[]) listeners;
659                 safeExecute(executor, new Runnable() {
660                     @Override
661                     public void run() {
662                         notifyProgressiveListeners0(self, array, progress, total);
663                     }
664                 });
665             } else {
666                 final GenericProgressiveFutureListener<ProgressiveFuture<V>> l =
667                         (GenericProgressiveFutureListener<ProgressiveFuture<V>>) listeners;
668                 safeExecute(executor, new Runnable() {
669                     @Override
670                     public void run() {
671                         notifyProgressiveListener0(self, l, progress, total);
672                     }
673                 });
674             }
675         }
676     }
677 
678     /**
679      * Returns a {@link GenericProgressiveFutureListener}, an array of {@link GenericProgressiveFutureListener}, or
680      * {@code null}.
681      */
682     private synchronized Object progressiveListeners() {
683         Object listeners = this.listeners;
684         if (listeners == null) {
685             // No listeners added
686             return null;
687         }
688 
689         if (listeners instanceof DefaultFutureListeners) {
690             // Copy DefaultFutureListeners into an array of listeners.
691             DefaultFutureListeners dfl = (DefaultFutureListeners) listeners;
692             int progressiveSize = dfl.progressiveSize();
693             switch (progressiveSize) {
694                 case 0:
695                     return null;
696                 case 1:
697                     for (GenericFutureListener<?> l: dfl.listeners()) {
698                         if (l instanceof GenericProgressiveFutureListener) {
699                             return l;
700                         }
701                     }
702                     return null;
703             }
704 
705             GenericFutureListener<?>[] array = dfl.listeners();
706             GenericProgressiveFutureListener<?>[] copy = new GenericProgressiveFutureListener[progressiveSize];
707             for (int i = 0, j = 0; j < progressiveSize; i ++) {
708                 GenericFutureListener<?> l = array[i];
709                 if (l instanceof GenericProgressiveFutureListener) {
710                     copy[j ++] = (GenericProgressiveFutureListener<?>) l;
711                 }
712             }
713 
714             return copy;
715         } else if (listeners instanceof GenericProgressiveFutureListener) {
716             return listeners;
717         } else {
718             // Only one listener was added and it's not a progressive listener.
719             return null;
720         }
721     }
722 
723     private static void notifyProgressiveListeners0(
724             ProgressiveFuture<?> future, GenericProgressiveFutureListener<?>[] listeners, long progress, long total) {
725         for (GenericProgressiveFutureListener<?> l: listeners) {
726             if (l == null) {
727                 break;
728             }
729             notifyProgressiveListener0(future, l, progress, total);
730         }
731     }
732 
733     @SuppressWarnings({ "unchecked", "rawtypes" })
734     private static void notifyProgressiveListener0(
735             ProgressiveFuture future, GenericProgressiveFutureListener l, long progress, long total) {
736         try {
737             l.operationProgressed(future, progress, total);
738         } catch (Throwable t) {
739             logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationProgressed()", t);
740         }
741     }
742 
743     private static boolean isCancelled0(Object result) {
744         return result instanceof CauseHolder && ((CauseHolder) result).cause instanceof CancellationException;
745     }
746 
747     private static boolean isDone0(Object result) {
748         return result != null && result != UNCANCELLABLE;
749     }
750 
751     private static final class CauseHolder {
752         final Throwable cause;
753         CauseHolder(Throwable cause) {
754             this.cause = cause;
755         }
756     }
757 
758     private static void safeExecute(EventExecutor executor, Runnable task) {
759         try {
760             executor.execute(task);
761         } catch (Throwable t) {
762             rejectedExecutionLogger.error("Failed to submit a listener notification task. Event loop shut down?", t);
763         }
764     }
765 }