View Javadoc

1   /*
2    * Copyright 2013 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.util.concurrent;
17  
18  import io.netty.util.Signal;
19  import io.netty.util.internal.InternalThreadLocalMap;
20  import io.netty.util.internal.PlatformDependent;
21  import io.netty.util.internal.StringUtil;
22  import io.netty.util.internal.SystemPropertyUtil;
23  import io.netty.util.internal.ThrowableUtil;
24  import io.netty.util.internal.logging.InternalLogger;
25  import io.netty.util.internal.logging.InternalLoggerFactory;
26  
27  import java.util.concurrent.CancellationException;
28  import java.util.concurrent.TimeUnit;
29  import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
30  
31  import static io.netty.util.internal.ObjectUtil.checkNotNull;
32  import static java.util.concurrent.TimeUnit.MILLISECONDS;
33  
34  public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
35      private static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultPromise.class);
36      private static final InternalLogger rejectedExecutionLogger =
37              InternalLoggerFactory.getInstance(DefaultPromise.class.getName() + ".rejectedExecution");
38      private static final int MAX_LISTENER_STACK_DEPTH = Math.min(8,
39              SystemPropertyUtil.getInt("io.netty.defaultPromise.maxListenerStackDepth", 8));
40      @SuppressWarnings("rawtypes")
41      private static final AtomicReferenceFieldUpdater<DefaultPromise, Object> RESULT_UPDATER =
42              AtomicReferenceFieldUpdater.newUpdater(DefaultPromise.class, Object.class, "result");
43      private static final Signal SUCCESS = Signal.valueOf(DefaultPromise.class.getName() + ".SUCCESS");
44      private static final Signal UNCANCELLABLE = Signal.valueOf(DefaultPromise.class.getName() + ".UNCANCELLABLE");
45      private static final CauseHolder CANCELLATION_CAUSE_HOLDER = new CauseHolder(ThrowableUtil.unknownStackTrace(
46              new CancellationException(), DefaultPromise.class, "cancel(...)"));
47  
48      private volatile Object result;
49      private final EventExecutor executor;
50      /**
51       * One or more listeners. Can be a {@link GenericFutureListener} or a {@link DefaultFutureListeners}.
52       * If {@code null}, it means either 1) no listeners were added yet or 2) all listeners were notified.
53       *
54       * Threading - synchronized(this). We must support adding listeners when there is no EventExecutor.
55       */
56      private Object listeners;
57      /**
58       * Threading - synchronized(this). We are required to hold the monitor to use Java's underlying wait()/notifyAll().
59       */
60      private short waiters;
61  
62      /**
63       * Threading - synchronized(this). We must prevent concurrent notification and FIFO listener notification if the
64       * executor changes.
65       */
66      private boolean notifyingListeners;
67  
68      /**
69       * Creates a new instance.
70       *
71       * It is preferable to use {@link EventExecutor#newPromise()} to create a new promise
72       *
73       * @param executor
74       *        the {@link EventExecutor} which is used to notify the promise once it is complete.
75       *        It is assumed this executor will protect against {@link StackOverflowError} exceptions.
76       *        The executor may be used to avoid {@link StackOverflowError} by executing a {@link Runnable} if the stack
77       *        depth exceeds a threshold.
78       *
79       */
80      public DefaultPromise(EventExecutor executor) {
81          this.executor = checkNotNull(executor, "executor");
82      }
83  
84      /**
85       * See {@link #executor()} for expectations of the executor.
86       */
87      protected DefaultPromise() {
88          // only for subclasses
89          executor = null;
90      }
91  
92      @Override
93      public Promise<V> setSuccess(V result) {
94          if (setSuccess0(result)) {
95              notifyListeners();
96              return this;
97          }
98          throw new IllegalStateException("complete already: " + this);
99      }
100 
101     @Override
102     public boolean trySuccess(V result) {
103         if (setSuccess0(result)) {
104             notifyListeners();
105             return true;
106         }
107         return false;
108     }
109 
110     @Override
111     public Promise<V> setFailure(Throwable cause) {
112         if (setFailure0(cause)) {
113             notifyListeners();
114             return this;
115         }
116         throw new IllegalStateException("complete already: " + this, cause);
117     }
118 
119     @Override
120     public boolean tryFailure(Throwable cause) {
121         if (setFailure0(cause)) {
122             notifyListeners();
123             return true;
124         }
125         return false;
126     }
127 
128     @Override
129     public boolean setUncancellable() {
130         if (RESULT_UPDATER.compareAndSet(this, null, UNCANCELLABLE)) {
131             return true;
132         }
133         Object result = this.result;
134         return !isDone0(result) || !isCancelled0(result);
135     }
136 
137     @Override
138     public boolean isSuccess() {
139         Object result = this.result;
140         return result != null && result != UNCANCELLABLE && !(result instanceof CauseHolder);
141     }
142 
143     @Override
144     public boolean isCancellable() {
145         return result == null;
146     }
147 
148     @Override
149     public Throwable cause() {
150         Object result = this.result;
151         return (result instanceof CauseHolder) ? ((CauseHolder) result).cause : null;
152     }
153 
154     @Override
155     public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
156         checkNotNull(listener, "listener");
157 
158         synchronized (this) {
159             addListener0(listener);
160         }
161 
162         if (isDone()) {
163             notifyListeners();
164         }
165 
166         return this;
167     }
168 
169     @Override
170     public Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners) {
171         checkNotNull(listeners, "listeners");
172 
173         synchronized (this) {
174             for (GenericFutureListener<? extends Future<? super V>> listener : listeners) {
175                 if (listener == null) {
176                     break;
177                 }
178                 addListener0(listener);
179             }
180         }
181 
182         if (isDone()) {
183             notifyListeners();
184         }
185 
186         return this;
187     }
188 
189     @Override
190     public Promise<V> removeListener(final GenericFutureListener<? extends Future<? super V>> listener) {
191         checkNotNull(listener, "listener");
192 
193         synchronized (this) {
194             removeListener0(listener);
195         }
196 
197         return this;
198     }
199 
200     @Override
201     public Promise<V> removeListeners(final GenericFutureListener<? extends Future<? super V>>... listeners) {
202         checkNotNull(listeners, "listeners");
203 
204         synchronized (this) {
205             for (GenericFutureListener<? extends Future<? super V>> listener : listeners) {
206                 if (listener == null) {
207                     break;
208                 }
209                 removeListener0(listener);
210             }
211         }
212 
213         return this;
214     }
215 
216     @Override
217     public Promise<V> await() throws InterruptedException {
218         if (isDone()) {
219             return this;
220         }
221 
222         if (Thread.interrupted()) {
223             throw new InterruptedException(toString());
224         }
225 
226         checkDeadLock();
227 
228         synchronized (this) {
229             while (!isDone()) {
230                 incWaiters();
231                 try {
232                     wait();
233                 } finally {
234                     decWaiters();
235                 }
236             }
237         }
238         return this;
239     }
240 
241     @Override
242     public Promise<V> awaitUninterruptibly() {
243         if (isDone()) {
244             return this;
245         }
246 
247         checkDeadLock();
248 
249         boolean interrupted = false;
250         synchronized (this) {
251             while (!isDone()) {
252                 incWaiters();
253                 try {
254                     wait();
255                 } catch (InterruptedException e) {
256                     // Interrupted while waiting.
257                     interrupted = true;
258                 } finally {
259                     decWaiters();
260                 }
261             }
262         }
263 
264         if (interrupted) {
265             Thread.currentThread().interrupt();
266         }
267 
268         return this;
269     }
270 
271     @Override
272     public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
273         return await0(unit.toNanos(timeout), true);
274     }
275 
276     @Override
277     public boolean await(long timeoutMillis) throws InterruptedException {
278         return await0(MILLISECONDS.toNanos(timeoutMillis), true);
279     }
280 
281     @Override
282     public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {
283         try {
284             return await0(unit.toNanos(timeout), false);
285         } catch (InterruptedException e) {
286             // Should not be raised at all.
287             throw new InternalError();
288         }
289     }
290 
291     @Override
292     public boolean awaitUninterruptibly(long timeoutMillis) {
293         try {
294             return await0(MILLISECONDS.toNanos(timeoutMillis), false);
295         } catch (InterruptedException e) {
296             // Should not be raised at all.
297             throw new InternalError();
298         }
299     }
300 
301     @SuppressWarnings("unchecked")
302     @Override
303     public V getNow() {
304         Object result = this.result;
305         if (result instanceof CauseHolder || result == SUCCESS) {
306             return null;
307         }
308         return (V) result;
309     }
310 
311     /**
312      * {@inheritDoc}
313      *
314      * @param mayInterruptIfRunning this value has no effect in this implementation.
315      */
316     @Override
317     public boolean cancel(boolean mayInterruptIfRunning) {
318         if (RESULT_UPDATER.compareAndSet(this, null, CANCELLATION_CAUSE_HOLDER)) {
319             checkNotifyWaiters();
320             notifyListeners();
321             return true;
322         }
323         return false;
324     }
325 
326     @Override
327     public boolean isCancelled() {
328         return isCancelled0(result);
329     }
330 
331     @Override
332     public boolean isDone() {
333         return isDone0(result);
334     }
335 
336     @Override
337     public Promise<V> sync() throws InterruptedException {
338         await();
339         rethrowIfFailed();
340         return this;
341     }
342 
343     @Override
344     public Promise<V> syncUninterruptibly() {
345         awaitUninterruptibly();
346         rethrowIfFailed();
347         return this;
348     }
349 
350     @Override
351     public String toString() {
352         return toStringBuilder().toString();
353     }
354 
355     protected StringBuilder toStringBuilder() {
356         StringBuilder buf = new StringBuilder(64)
357                 .append(StringUtil.simpleClassName(this))
358                 .append('@')
359                 .append(Integer.toHexString(hashCode()));
360 
361         Object result = this.result;
362         if (result == SUCCESS) {
363             buf.append("(success)");
364         } else if (result == UNCANCELLABLE) {
365             buf.append("(uncancellable)");
366         } else if (result instanceof CauseHolder) {
367             buf.append("(failure: ")
368                     .append(((CauseHolder) result).cause)
369                     .append(')');
370         } else if (result != null) {
371             buf.append("(success: ")
372                     .append(result)
373                     .append(')');
374         } else {
375             buf.append("(incomplete)");
376         }
377 
378         return buf;
379     }
380 
381     /**
382      * Get the executor used to notify listeners when this promise is complete.
383      * <p>
384      * It is assumed this executor will protect against {@link StackOverflowError} exceptions.
385      * The executor may be used to avoid {@link StackOverflowError} by executing a {@link Runnable} if the stack
386      * depth exceeds a threshold.
387      * @return The executor used to notify listeners when this promise is complete.
388      */
389     protected EventExecutor executor() {
390         return executor;
391     }
392 
393     protected void checkDeadLock() {
394         EventExecutor e = executor();
395         if (e != null && e.inEventLoop()) {
396             throw new BlockingOperationException(toString());
397         }
398     }
399 
400     /**
401      * Notify a listener that a future has completed.
402      * <p>
403      * This method has a fixed depth of {@link #MAX_LISTENER_STACK_DEPTH} that will limit recursion to prevent
404      * {@link StackOverflowError} and will stop notifying listeners added after this threshold is exceeded.
405      * @param eventExecutor the executor to use to notify the listener {@code listener}.
406      * @param future the future that is complete.
407      * @param listener the listener to notify.
408      */
409     protected static void notifyListener(
410             EventExecutor eventExecutor, final Future<?> future, final GenericFutureListener<?> listener) {
411         checkNotNull(eventExecutor, "eventExecutor");
412         checkNotNull(future, "future");
413         checkNotNull(listener, "listener");
414         notifyListenerWithStackOverFlowProtection(eventExecutor, future, listener);
415     }
416 
417     private void notifyListeners() {
418         EventExecutor executor = executor();
419         if (executor.inEventLoop()) {
420             final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
421             final int stackDepth = threadLocals.futureListenerStackDepth();
422             if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
423                 threadLocals.setFutureListenerStackDepth(stackDepth + 1);
424                 try {
425                     notifyListenersNow();
426                 } finally {
427                     threadLocals.setFutureListenerStackDepth(stackDepth);
428                 }
429                 return;
430             }
431         }
432 
433         safeExecute(executor, new Runnable() {
434             @Override
435             public void run() {
436                 notifyListenersNow();
437             }
438         });
439     }
440 
441     /**
442      * The logic in this method should be identical to {@link #notifyListeners()} but
443      * cannot share code because the listener(s) cannot be cached for an instance of {@link DefaultPromise} since the
444      * listener(s) may be changed and is protected by a synchronized operation.
445      */
446     private static void notifyListenerWithStackOverFlowProtection(final EventExecutor executor,
447                                                                   final Future<?> future,
448                                                                   final GenericFutureListener<?> listener) {
449         if (executor.inEventLoop()) {
450             final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
451             final int stackDepth = threadLocals.futureListenerStackDepth();
452             if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
453                 threadLocals.setFutureListenerStackDepth(stackDepth + 1);
454                 try {
455                     notifyListener0(future, listener);
456                 } finally {
457                     threadLocals.setFutureListenerStackDepth(stackDepth);
458                 }
459                 return;
460             }
461         }
462 
463         safeExecute(executor, new Runnable() {
464             @Override
465             public void run() {
466                 notifyListener0(future, listener);
467             }
468         });
469     }
470 
471     private void notifyListenersNow() {
472         Object listeners;
473         synchronized (this) {
474             // Only proceed if there are listeners to notify and we are not already notifying listeners.
475             if (notifyingListeners || this.listeners == null) {
476                 return;
477             }
478             notifyingListeners = true;
479             listeners = this.listeners;
480             this.listeners = null;
481         }
482         for (;;) {
483             if (listeners instanceof DefaultFutureListeners) {
484                 notifyListeners0((DefaultFutureListeners) listeners);
485             } else {
486                 notifyListener0(this, (GenericFutureListener<? extends Future<V>>) listeners);
487             }
488             synchronized (this) {
489                 if (this.listeners == null) {
490                     // Nothing can throw from within this method, so setting notifyingListeners back to false does not
491                     // need to be in a finally block.
492                     notifyingListeners = false;
493                     return;
494                 }
495                 listeners = this.listeners;
496                 this.listeners = null;
497             }
498         }
499     }
500 
501     private void notifyListeners0(DefaultFutureListeners listeners) {
502         GenericFutureListener<?>[] a = listeners.listeners();
503         int size = listeners.size();
504         for (int i = 0; i < size; i ++) {
505             notifyListener0(this, a[i]);
506         }
507     }
508 
509     @SuppressWarnings({ "unchecked", "rawtypes" })
510     private static void notifyListener0(Future future, GenericFutureListener l) {
511         try {
512             l.operationComplete(future);
513         } catch (Throwable t) {
514             logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t);
515         }
516     }
517 
518     private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) {
519         if (listeners == null) {
520             listeners = listener;
521         } else if (listeners instanceof DefaultFutureListeners) {
522             ((DefaultFutureListeners) listeners).add(listener);
523         } else {
524             listeners = new DefaultFutureListeners((GenericFutureListener<? extends Future<V>>) listeners, listener);
525         }
526     }
527 
528     private void removeListener0(GenericFutureListener<? extends Future<? super V>> listener) {
529         if (listeners instanceof DefaultFutureListeners) {
530             ((DefaultFutureListeners) listeners).remove(listener);
531         } else if (listeners == listener) {
532             listeners = null;
533         }
534     }
535 
536     private boolean setSuccess0(V result) {
537         return setValue0(result == null ? SUCCESS : result);
538     }
539 
540     private boolean setFailure0(Throwable cause) {
541         return setValue0(new CauseHolder(checkNotNull(cause, "cause")));
542     }
543 
544     private boolean setValue0(Object objResult) {
545         if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
546             RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
547             checkNotifyWaiters();
548             return true;
549         }
550         return false;
551     }
552 
553     private synchronized void checkNotifyWaiters() {
554         if (waiters > 0) {
555             notifyAll();
556         }
557     }
558 
559     private void incWaiters() {
560         if (waiters == Short.MAX_VALUE) {
561             throw new IllegalStateException("too many waiters: " + this);
562         }
563         ++waiters;
564     }
565 
566     private void decWaiters() {
567         --waiters;
568     }
569 
570     private void rethrowIfFailed() {
571         Throwable cause = cause();
572         if (cause == null) {
573             return;
574         }
575 
576         PlatformDependent.throwException(cause);
577     }
578 
579     private boolean await0(long timeoutNanos, boolean interruptable) throws InterruptedException {
580         if (isDone()) {
581             return true;
582         }
583 
584         if (timeoutNanos <= 0) {
585             return isDone();
586         }
587 
588         if (interruptable && Thread.interrupted()) {
589             throw new InterruptedException(toString());
590         }
591 
592         checkDeadLock();
593 
594         long startTime = System.nanoTime();
595         long waitTime = timeoutNanos;
596         boolean interrupted = false;
597         try {
598             for (;;) {
599                 synchronized (this) {
600                     if (isDone()) {
601                         return true;
602                     }
603                     incWaiters();
604                     try {
605                         wait(waitTime / 1000000, (int) (waitTime % 1000000));
606                     } catch (InterruptedException e) {
607                         if (interruptable) {
608                             throw e;
609                         } else {
610                             interrupted = true;
611                         }
612                     } finally {
613                         decWaiters();
614                     }
615                 }
616                 if (isDone()) {
617                     return true;
618                 } else {
619                     waitTime = timeoutNanos - (System.nanoTime() - startTime);
620                     if (waitTime <= 0) {
621                         return isDone();
622                     }
623                 }
624             }
625         } finally {
626             if (interrupted) {
627                 Thread.currentThread().interrupt();
628             }
629         }
630     }
631 
632     /**
633      * Notify all progressive listeners.
634      * <p>
635      * No attempt is made to ensure notification order if multiple calls are made to this method before
636      * the original invocation completes.
637      * <p>
638      * This will do an iteration over all listeners to get all of type {@link GenericProgressiveFutureListener}s.
639      * @param progress the new progress.
640      * @param total the total progress.
641      */
642     @SuppressWarnings("unchecked")
643     void notifyProgressiveListeners(final long progress, final long total) {
644         final Object listeners = progressiveListeners();
645         if (listeners == null) {
646             return;
647         }
648 
649         final ProgressiveFuture<V> self = (ProgressiveFuture<V>) this;
650 
651         EventExecutor executor = executor();
652         if (executor.inEventLoop()) {
653             if (listeners instanceof GenericProgressiveFutureListener[]) {
654                 notifyProgressiveListeners0(
655                         self, (GenericProgressiveFutureListener<?>[]) listeners, progress, total);
656             } else {
657                 notifyProgressiveListener0(
658                         self, (GenericProgressiveFutureListener<ProgressiveFuture<V>>) listeners, progress, total);
659             }
660         } else {
661             if (listeners instanceof GenericProgressiveFutureListener[]) {
662                 final GenericProgressiveFutureListener<?>[] array =
663                         (GenericProgressiveFutureListener<?>[]) listeners;
664                 safeExecute(executor, new Runnable() {
665                     @Override
666                     public void run() {
667                         notifyProgressiveListeners0(self, array, progress, total);
668                     }
669                 });
670             } else {
671                 final GenericProgressiveFutureListener<ProgressiveFuture<V>> l =
672                         (GenericProgressiveFutureListener<ProgressiveFuture<V>>) listeners;
673                 safeExecute(executor, new Runnable() {
674                     @Override
675                     public void run() {
676                         notifyProgressiveListener0(self, l, progress, total);
677                     }
678                 });
679             }
680         }
681     }
682 
683     /**
684      * Returns a {@link GenericProgressiveFutureListener}, an array of {@link GenericProgressiveFutureListener}, or
685      * {@code null}.
686      */
687     private synchronized Object progressiveListeners() {
688         Object listeners = this.listeners;
689         if (listeners == null) {
690             // No listeners added
691             return null;
692         }
693 
694         if (listeners instanceof DefaultFutureListeners) {
695             // Copy DefaultFutureListeners into an array of listeners.
696             DefaultFutureListeners dfl = (DefaultFutureListeners) listeners;
697             int progressiveSize = dfl.progressiveSize();
698             switch (progressiveSize) {
699                 case 0:
700                     return null;
701                 case 1:
702                     for (GenericFutureListener<?> l: dfl.listeners()) {
703                         if (l instanceof GenericProgressiveFutureListener) {
704                             return l;
705                         }
706                     }
707                     return null;
708             }
709 
710             GenericFutureListener<?>[] array = dfl.listeners();
711             GenericProgressiveFutureListener<?>[] copy = new GenericProgressiveFutureListener[progressiveSize];
712             for (int i = 0, j = 0; j < progressiveSize; i ++) {
713                 GenericFutureListener<?> l = array[i];
714                 if (l instanceof GenericProgressiveFutureListener) {
715                     copy[j ++] = (GenericProgressiveFutureListener<?>) l;
716                 }
717             }
718 
719             return copy;
720         } else if (listeners instanceof GenericProgressiveFutureListener) {
721             return listeners;
722         } else {
723             // Only one listener was added and it's not a progressive listener.
724             return null;
725         }
726     }
727 
728     private static void notifyProgressiveListeners0(
729             ProgressiveFuture<?> future, GenericProgressiveFutureListener<?>[] listeners, long progress, long total) {
730         for (GenericProgressiveFutureListener<?> l: listeners) {
731             if (l == null) {
732                 break;
733             }
734             notifyProgressiveListener0(future, l, progress, total);
735         }
736     }
737 
738     @SuppressWarnings({ "unchecked", "rawtypes" })
739     private static void notifyProgressiveListener0(
740             ProgressiveFuture future, GenericProgressiveFutureListener l, long progress, long total) {
741         try {
742             l.operationProgressed(future, progress, total);
743         } catch (Throwable t) {
744             logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationProgressed()", t);
745         }
746     }
747 
748     private static boolean isCancelled0(Object result) {
749         return result instanceof CauseHolder && ((CauseHolder) result).cause instanceof CancellationException;
750     }
751 
752     private static boolean isDone0(Object result) {
753         return result != null && result != UNCANCELLABLE;
754     }
755 
756     private static final class CauseHolder {
757         final Throwable cause;
758         CauseHolder(Throwable cause) {
759             this.cause = cause;
760         }
761     }
762 
763     private static void safeExecute(EventExecutor executor, Runnable task) {
764         try {
765             executor.execute(task);
766         } catch (Throwable t) {
767             rejectedExecutionLogger.error("Failed to submit a listener notification task. Event loop shut down?", t);
768         }
769     }
770 }