1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.util.concurrent;
17
18 import io.netty.util.internal.InternalThreadLocalMap;
19 import io.netty.util.internal.PlatformDependent;
20 import io.netty.util.internal.StringUtil;
21 import io.netty.util.internal.SystemPropertyUtil;
22 import io.netty.util.internal.ThrowableUtil;
23 import io.netty.util.internal.logging.InternalLogger;
24 import io.netty.util.internal.logging.InternalLoggerFactory;
25
26 import java.util.Locale;
27 import java.util.concurrent.CancellationException;
28 import java.util.concurrent.CompletionException;
29 import java.util.concurrent.ExecutionException;
30 import java.util.concurrent.TimeUnit;
31 import java.util.concurrent.TimeoutException;
32 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
33
34 import static io.netty.util.internal.ObjectUtil.checkNotNull;
35 import static java.util.concurrent.TimeUnit.MILLISECONDS;
36
37 public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
38
39
40
41
42
43
44
45
46
47 public static final String PROPERTY_MAX_LISTENER_STACK_DEPTH = "io.netty.defaultPromise.maxListenerStackDepth";
48
49 private static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultPromise.class);
50 private static final InternalLogger rejectedExecutionLogger =
51 InternalLoggerFactory.getInstance(DefaultPromise.class.getName() + ".rejectedExecution");
52 private static final int MAX_LISTENER_STACK_DEPTH = Math.min(8,
53 SystemPropertyUtil.getInt(PROPERTY_MAX_LISTENER_STACK_DEPTH, 8));
54 @SuppressWarnings("rawtypes")
55 private static final AtomicReferenceFieldUpdater<DefaultPromise, Object> RESULT_UPDATER =
56 AtomicReferenceFieldUpdater.newUpdater(DefaultPromise.class, Object.class, "result");
57 private static final Object SUCCESS = new Object();
58 private static final Object UNCANCELLABLE = new Object();
59 private static final CauseHolder CANCELLATION_CAUSE_HOLDER = new CauseHolder(
60 StacklessCancellationException.newInstance(DefaultPromise.class, "cancel(...)"));
61 private static final StackTraceElement[] CANCELLATION_STACK = CANCELLATION_CAUSE_HOLDER.cause.getStackTrace();
62
63 private volatile Object result;
64 private final EventExecutor executor;
65
66
67
68
69
70
71
72 private GenericFutureListener<? extends Future<?>> listener;
73 private DefaultFutureListeners listeners;
74
75
76
77 private short waiters;
78
79
80
81
82
83 private boolean notifyingListeners;
84
85
86
87
88
89
90
91
92
93
94
95
96
97 public DefaultPromise(EventExecutor executor) {
98 this.executor = checkNotNull(executor, "executor");
99 }
100
101
102
103
104 protected DefaultPromise() {
105
106 executor = null;
107 }
108
109 @Override
110 public Promise<V> setSuccess(V result) {
111 if (setSuccess0(result)) {
112 return this;
113 }
114 throw new IllegalStateException("complete already: " + this);
115 }
116
117 @Override
118 public boolean trySuccess(V result) {
119 return setSuccess0(result);
120 }
121
122 @Override
123 public Promise<V> setFailure(Throwable cause) {
124 if (setFailure0(cause)) {
125 return this;
126 }
127 throw new IllegalStateException("complete already: " + this, cause);
128 }
129
130 @Override
131 public boolean tryFailure(Throwable cause) {
132 return setFailure0(cause);
133 }
134
135 @Override
136 public boolean setUncancellable() {
137 if (RESULT_UPDATER.compareAndSet(this, null, UNCANCELLABLE)) {
138 return true;
139 }
140 Object result = this.result;
141 return !isDone0(result) || !isCancelled0(result);
142 }
143
144 @Override
145 public boolean isSuccess() {
146 Object result = this.result;
147 return result != null && result != UNCANCELLABLE && !(result instanceof CauseHolder);
148 }
149
150 @Override
151 public boolean isCancellable() {
152 return result == null;
153 }
154
155 private static final class LeanCancellationException extends CancellationException {
156 private static final long serialVersionUID = 2794674970981187807L;
157
158
159 @Override
160 public Throwable fillInStackTrace() {
161 setStackTrace(CANCELLATION_STACK);
162 return this;
163 }
164
165 @Override
166 public String toString() {
167 return CancellationException.class.getName();
168 }
169 }
170
171 @Override
172 public Throwable cause() {
173 return cause0(result);
174 }
175
176 private Throwable cause0(Object result) {
177 if (!(result instanceof CauseHolder)) {
178 return null;
179 }
180 if (result == CANCELLATION_CAUSE_HOLDER) {
181 CancellationException ce = new LeanCancellationException();
182 if (RESULT_UPDATER.compareAndSet(this, CANCELLATION_CAUSE_HOLDER, new CauseHolder(ce))) {
183 return ce;
184 }
185 result = this.result;
186 }
187 return ((CauseHolder) result).cause;
188 }
189
190 @Override
191 public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
192 checkNotNull(listener, "listener");
193
194 synchronized (this) {
195 addListener0(listener);
196 }
197
198 if (isDone()) {
199 notifyListeners();
200 }
201
202 return this;
203 }
204
205 @Override
206 public Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners) {
207 checkNotNull(listeners, "listeners");
208
209 synchronized (this) {
210 for (GenericFutureListener<? extends Future<? super V>> listener : listeners) {
211 if (listener == null) {
212 break;
213 }
214 addListener0(listener);
215 }
216 }
217
218 if (isDone()) {
219 notifyListeners();
220 }
221
222 return this;
223 }
224
225 @Override
226 public Promise<V> removeListener(final GenericFutureListener<? extends Future<? super V>> listener) {
227 checkNotNull(listener, "listener");
228
229 synchronized (this) {
230 removeListener0(listener);
231 }
232
233 return this;
234 }
235
236 @Override
237 public Promise<V> removeListeners(final GenericFutureListener<? extends Future<? super V>>... listeners) {
238 checkNotNull(listeners, "listeners");
239
240 synchronized (this) {
241 for (GenericFutureListener<? extends Future<? super V>> listener : listeners) {
242 if (listener == null) {
243 break;
244 }
245 removeListener0(listener);
246 }
247 }
248
249 return this;
250 }
251
252 @Override
253 public Promise<V> await() throws InterruptedException {
254 if (isDone()) {
255 return this;
256 }
257
258 if (Thread.interrupted()) {
259 throw new InterruptedException(toString());
260 }
261
262 checkDeadLock();
263
264 synchronized (this) {
265 while (!isDone()) {
266 incWaiters();
267 try {
268 wait();
269 } finally {
270 decWaiters();
271 }
272 }
273 }
274 return this;
275 }
276
277 @Override
278 public Promise<V> awaitUninterruptibly() {
279 if (isDone()) {
280 return this;
281 }
282
283 checkDeadLock();
284
285 boolean interrupted = false;
286 synchronized (this) {
287 while (!isDone()) {
288 incWaiters();
289 try {
290 wait();
291 } catch (InterruptedException e) {
292
293 interrupted = true;
294 } finally {
295 decWaiters();
296 }
297 }
298 }
299
300 if (interrupted) {
301 Thread.currentThread().interrupt();
302 }
303
304 return this;
305 }
306
307 @Override
308 public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
309 return await0(unit.toNanos(timeout), true);
310 }
311
312 @Override
313 public boolean await(long timeoutMillis) throws InterruptedException {
314 return await0(MILLISECONDS.toNanos(timeoutMillis), true);
315 }
316
317 @Override
318 public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {
319 try {
320 return await0(unit.toNanos(timeout), false);
321 } catch (InterruptedException e) {
322
323 throw new InternalError();
324 }
325 }
326
327 @Override
328 public boolean awaitUninterruptibly(long timeoutMillis) {
329 try {
330 return await0(MILLISECONDS.toNanos(timeoutMillis), false);
331 } catch (InterruptedException e) {
332
333 throw new InternalError();
334 }
335 }
336
337 @SuppressWarnings("unchecked")
338 @Override
339 public V getNow() {
340 Object result = this.result;
341 if (result instanceof CauseHolder || result == SUCCESS || result == UNCANCELLABLE) {
342 return null;
343 }
344 return (V) result;
345 }
346
347 @SuppressWarnings("unchecked")
348 @Override
349 public V get() throws InterruptedException, ExecutionException {
350 Object result = this.result;
351 if (!isDone0(result)) {
352 await();
353 result = this.result;
354 }
355 if (result == SUCCESS || result == UNCANCELLABLE) {
356 return null;
357 }
358 Throwable cause = cause0(result);
359 if (cause == null) {
360 return (V) result;
361 }
362 if (cause instanceof CancellationException) {
363 throw (CancellationException) cause;
364 }
365 throw new ExecutionException(cause);
366 }
367
368 @SuppressWarnings("unchecked")
369 @Override
370 public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
371 Object result = this.result;
372 if (!isDone0(result)) {
373 if (!await(timeout, unit)) {
374 throw new TimeoutException("timeout after " + timeout + " " + unit.name().toLowerCase(Locale.ENGLISH));
375 }
376 result = this.result;
377 }
378 if (result == SUCCESS || result == UNCANCELLABLE) {
379 return null;
380 }
381 Throwable cause = cause0(result);
382 if (cause == null) {
383 return (V) result;
384 }
385 if (cause instanceof CancellationException) {
386 throw (CancellationException) cause;
387 }
388 throw new ExecutionException(cause);
389 }
390
391
392
393
394
395
396 @Override
397 public boolean cancel(boolean mayInterruptIfRunning) {
398 if (RESULT_UPDATER.compareAndSet(this, null, CANCELLATION_CAUSE_HOLDER)) {
399 if (checkNotifyWaiters()) {
400 notifyListeners();
401 }
402 return true;
403 }
404 return false;
405 }
406
407 @Override
408 public boolean isCancelled() {
409 return isCancelled0(result);
410 }
411
412 @Override
413 public boolean isDone() {
414 return isDone0(result);
415 }
416
417 @Override
418 public Promise<V> sync() throws InterruptedException {
419 await();
420 rethrowIfFailed();
421 return this;
422 }
423
424 @Override
425 public Promise<V> syncUninterruptibly() {
426 awaitUninterruptibly();
427 rethrowIfFailed();
428 return this;
429 }
430
431 @Override
432 public String toString() {
433 return toStringBuilder().toString();
434 }
435
436 protected StringBuilder toStringBuilder() {
437 StringBuilder buf = new StringBuilder(64)
438 .append(StringUtil.simpleClassName(this))
439 .append('@')
440 .append(Integer.toHexString(hashCode()));
441
442 Object result = this.result;
443 if (result == SUCCESS) {
444 buf.append("(success)");
445 } else if (result == UNCANCELLABLE) {
446 buf.append("(uncancellable)");
447 } else if (result instanceof CauseHolder) {
448 buf.append("(failure: ")
449 .append(((CauseHolder) result).cause)
450 .append(')');
451 } else if (result != null) {
452 buf.append("(success: ")
453 .append(result)
454 .append(')');
455 } else {
456 buf.append("(incomplete)");
457 }
458
459 return buf;
460 }
461
462
463
464
465
466
467
468
469
470 protected EventExecutor executor() {
471 return executor;
472 }
473
474 protected void checkDeadLock() {
475 EventExecutor e = executor();
476 if (e != null && e.inEventLoop()) {
477 throw new BlockingOperationException(toString());
478 }
479 }
480
481
482
483
484
485
486
487
488
489
490 protected static void notifyListener(
491 EventExecutor eventExecutor, final Future<?> future, final GenericFutureListener<?> listener) {
492 notifyListenerWithStackOverFlowProtection(
493 checkNotNull(eventExecutor, "eventExecutor"),
494 checkNotNull(future, "future"),
495 checkNotNull(listener, "listener"));
496 }
497
498 private void notifyListeners() {
499 EventExecutor executor = executor();
500 if (executor.inEventLoop()) {
501 final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
502 final int stackDepth = threadLocals.futureListenerStackDepth();
503 if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
504 threadLocals.setFutureListenerStackDepth(stackDepth + 1);
505 try {
506 notifyListenersNow();
507 } finally {
508 threadLocals.setFutureListenerStackDepth(stackDepth);
509 }
510 return;
511 }
512 }
513
514 safeExecute(executor, new Runnable() {
515 @Override
516 public void run() {
517 notifyListenersNow();
518 }
519 });
520 }
521
522
523
524
525
526
527 private static void notifyListenerWithStackOverFlowProtection(final EventExecutor executor,
528 final Future<?> future,
529 final GenericFutureListener<?> listener) {
530 if (executor.inEventLoop()) {
531 final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
532 final int stackDepth = threadLocals.futureListenerStackDepth();
533 if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
534 threadLocals.setFutureListenerStackDepth(stackDepth + 1);
535 try {
536 notifyListener0(future, listener);
537 } finally {
538 threadLocals.setFutureListenerStackDepth(stackDepth);
539 }
540 return;
541 }
542 }
543
544 safeExecute(executor, new Runnable() {
545 @Override
546 public void run() {
547 notifyListener0(future, listener);
548 }
549 });
550 }
551
552 private void notifyListenersNow() {
553 GenericFutureListener listener;
554 DefaultFutureListeners listeners;
555 synchronized (this) {
556 listener = this.listener;
557 listeners = this.listeners;
558
559 if (notifyingListeners || (listener == null && listeners == null)) {
560 return;
561 }
562 notifyingListeners = true;
563 if (listener != null) {
564 this.listener = null;
565 } else {
566 this.listeners = null;
567 }
568 }
569 for (;;) {
570 if (listener != null) {
571 notifyListener0(this, listener);
572 } else {
573 notifyListeners0(listeners);
574 }
575 synchronized (this) {
576 if (this.listener == null && this.listeners == null) {
577
578
579 notifyingListeners = false;
580 return;
581 }
582 listener = this.listener;
583 listeners = this.listeners;
584 if (listener != null) {
585 this.listener = null;
586 } else {
587 this.listeners = null;
588 }
589 }
590 }
591 }
592
593 private void notifyListeners0(DefaultFutureListeners listeners) {
594 GenericFutureListener<?>[] a = listeners.listeners();
595 int size = listeners.size();
596 for (int i = 0; i < size; i ++) {
597 notifyListener0(this, a[i]);
598 }
599 }
600
601 @SuppressWarnings({ "unchecked", "rawtypes" })
602 private static void notifyListener0(Future future, GenericFutureListener l) {
603 try {
604 l.operationComplete(future);
605 } catch (Throwable t) {
606 if (logger.isWarnEnabled()) {
607 logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t);
608 }
609 }
610 }
611
612 private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) {
613 if (this.listener == null) {
614 if (listeners == null) {
615 this.listener = listener;
616 } else {
617 listeners.add(listener);
618 }
619 } else {
620 assert listeners == null;
621 listeners = new DefaultFutureListeners(this.listener, listener);
622 this.listener = null;
623 }
624 }
625
626 private void removeListener0(GenericFutureListener<? extends Future<? super V>> toRemove) {
627 if (listener == toRemove) {
628 listener = null;
629 } else if (listeners != null) {
630 listeners.remove(toRemove);
631
632 if (listeners.size() == 0) {
633 listeners = null;
634 }
635 }
636 }
637
638 private boolean setSuccess0(V result) {
639 return setValue0(result == null ? SUCCESS : result);
640 }
641
642 private boolean setFailure0(Throwable cause) {
643 return setValue0(new CauseHolder(checkNotNull(cause, "cause")));
644 }
645
646 private boolean setValue0(Object objResult) {
647 if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
648 RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
649 if (checkNotifyWaiters()) {
650 notifyListeners();
651 }
652 return true;
653 }
654 return false;
655 }
656
657
658
659
660
661 private synchronized boolean checkNotifyWaiters() {
662 if (waiters > 0) {
663 notifyAll();
664 }
665 return listener != null || listeners != null;
666 }
667
668 private void incWaiters() {
669 if (waiters == Short.MAX_VALUE) {
670 throw new IllegalStateException("too many waiters: " + this);
671 }
672 ++waiters;
673 }
674
675 private void decWaiters() {
676 --waiters;
677 }
678
679 private void rethrowIfFailed() {
680 Throwable cause = cause();
681 if (cause == null) {
682 return;
683 }
684
685 if (!(cause instanceof CancellationException) && ThrowableUtil.getSuppressed(cause).length == 0) {
686 ThrowableUtil.addSuppressed(cause, new RuntimeException("Rethrowing promise failure cause"));
687 }
688 PlatformDependent.throwException(cause);
689 }
690
691 private boolean await0(long timeoutNanos, boolean interruptable) throws InterruptedException {
692 if (isDone()) {
693 return true;
694 }
695
696 if (timeoutNanos <= 0) {
697 return isDone();
698 }
699
700 if (interruptable && Thread.interrupted()) {
701 throw new InterruptedException(toString());
702 }
703
704 checkDeadLock();
705
706
707
708 final long startTime = System.nanoTime();
709 synchronized (this) {
710 boolean interrupted = false;
711 try {
712 long waitTime = timeoutNanos;
713 while (!isDone() && waitTime > 0) {
714 incWaiters();
715 try {
716 wait(waitTime / 1000000, (int) (waitTime % 1000000));
717 } catch (InterruptedException e) {
718 if (interruptable) {
719 throw e;
720 } else {
721 interrupted = true;
722 }
723 } finally {
724 decWaiters();
725 }
726
727 if (isDone()) {
728 return true;
729 }
730
731
732 waitTime = timeoutNanos - (System.nanoTime() - startTime);
733 }
734 return isDone();
735 } finally {
736 if (interrupted) {
737 Thread.currentThread().interrupt();
738 }
739 }
740 }
741 }
742
743
744
745
746
747
748
749
750
751
752
753 @SuppressWarnings("unchecked")
754 void notifyProgressiveListeners(final long progress, final long total) {
755 final Object listeners = progressiveListeners();
756 if (listeners == null) {
757 return;
758 }
759
760 final ProgressiveFuture<V> self = (ProgressiveFuture<V>) this;
761
762 EventExecutor executor = executor();
763 if (executor.inEventLoop()) {
764 if (listeners instanceof GenericProgressiveFutureListener[]) {
765 notifyProgressiveListeners0(
766 self, (GenericProgressiveFutureListener<?>[]) listeners, progress, total);
767 } else {
768 notifyProgressiveListener0(
769 self, (GenericProgressiveFutureListener<ProgressiveFuture<V>>) listeners, progress, total);
770 }
771 } else {
772 if (listeners instanceof GenericProgressiveFutureListener[]) {
773 final GenericProgressiveFutureListener<?>[] array =
774 (GenericProgressiveFutureListener<?>[]) listeners;
775 safeExecute(executor, new Runnable() {
776 @Override
777 public void run() {
778 notifyProgressiveListeners0(self, array, progress, total);
779 }
780 });
781 } else {
782 final GenericProgressiveFutureListener<ProgressiveFuture<V>> l =
783 (GenericProgressiveFutureListener<ProgressiveFuture<V>>) listeners;
784 safeExecute(executor, new Runnable() {
785 @Override
786 public void run() {
787 notifyProgressiveListener0(self, l, progress, total);
788 }
789 });
790 }
791 }
792 }
793
794
795
796
797
798 private synchronized Object progressiveListeners() {
799 final GenericFutureListener listener = this.listener;
800 final DefaultFutureListeners listeners = this.listeners;
801 if (listener == null && listeners == null) {
802
803 return null;
804 }
805
806 if (listeners != null) {
807
808 DefaultFutureListeners dfl = listeners;
809 int progressiveSize = dfl.progressiveSize();
810 switch (progressiveSize) {
811 case 0:
812 return null;
813 case 1:
814 for (GenericFutureListener<?> l: dfl.listeners()) {
815 if (l instanceof GenericProgressiveFutureListener) {
816 return l;
817 }
818 }
819 return null;
820 }
821
822 GenericFutureListener<?>[] array = dfl.listeners();
823 GenericProgressiveFutureListener<?>[] copy = new GenericProgressiveFutureListener[progressiveSize];
824 for (int i = 0, j = 0; j < progressiveSize; i ++) {
825 GenericFutureListener<?> l = array[i];
826 if (l instanceof GenericProgressiveFutureListener) {
827 copy[j ++] = (GenericProgressiveFutureListener<?>) l;
828 }
829 }
830
831 return copy;
832 } else if (listener instanceof GenericProgressiveFutureListener) {
833 return listener;
834 } else {
835
836 return null;
837 }
838 }
839
840 private static void notifyProgressiveListeners0(
841 ProgressiveFuture<?> future, GenericProgressiveFutureListener<?>[] listeners, long progress, long total) {
842 for (GenericProgressiveFutureListener<?> l: listeners) {
843 if (l == null) {
844 break;
845 }
846 notifyProgressiveListener0(future, l, progress, total);
847 }
848 }
849
850 @SuppressWarnings({ "unchecked", "rawtypes" })
851 private static void notifyProgressiveListener0(
852 ProgressiveFuture future, GenericProgressiveFutureListener l, long progress, long total) {
853 try {
854 l.operationProgressed(future, progress, total);
855 } catch (Throwable t) {
856 if (logger.isWarnEnabled()) {
857 logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationProgressed()", t);
858 }
859 }
860 }
861
862 private static boolean isCancelled0(Object result) {
863 return result instanceof CauseHolder && ((CauseHolder) result).cause instanceof CancellationException;
864 }
865
866 private static boolean isDone0(Object result) {
867 return result != null && result != UNCANCELLABLE;
868 }
869
870 private static final class CauseHolder {
871 final Throwable cause;
872 CauseHolder(Throwable cause) {
873 this.cause = cause;
874 }
875 }
876
877 private static void safeExecute(EventExecutor executor, Runnable task) {
878 try {
879 executor.execute(task);
880 } catch (Throwable t) {
881 rejectedExecutionLogger.error("Failed to submit a listener notification task. Event loop shut down?", t);
882 }
883 }
884
885 private static final class StacklessCancellationException extends CancellationException {
886
887 private static final long serialVersionUID = -2974906711413716191L;
888
889 private StacklessCancellationException() { }
890
891
892
893 @Override
894 public Throwable fillInStackTrace() {
895 return this;
896 }
897
898 static StacklessCancellationException newInstance(Class<?> clazz, String method) {
899 return ThrowableUtil.unknownStackTrace(new StacklessCancellationException(), clazz, method);
900 }
901 }
902 }