1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.util.concurrent;
17
18 import io.netty.util.internal.InternalThreadLocalMap;
19 import io.netty.util.internal.PlatformDependent;
20 import io.netty.util.internal.StringUtil;
21 import io.netty.util.internal.SystemPropertyUtil;
22 import io.netty.util.internal.ThrowableUtil;
23 import io.netty.util.internal.logging.InternalLogger;
24 import io.netty.util.internal.logging.InternalLoggerFactory;
25
26 import java.util.concurrent.CancellationException;
27 import java.util.concurrent.CompletionException;
28 import java.util.concurrent.ExecutionException;
29 import java.util.concurrent.TimeUnit;
30 import java.util.concurrent.TimeoutException;
31 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
32
33 import static io.netty.util.internal.ObjectUtil.checkNotNull;
34 import static java.util.concurrent.TimeUnit.MILLISECONDS;
35
36 public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
37
38
39
40
41
42
43
44
45
46 public static final String PROPERTY_MAX_LISTENER_STACK_DEPTH = "io.netty.defaultPromise.maxListenerStackDepth";
47
48 private static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultPromise.class);
49 private static final InternalLogger rejectedExecutionLogger =
50 InternalLoggerFactory.getInstance(DefaultPromise.class.getName() + ".rejectedExecution");
51 private static final int MAX_LISTENER_STACK_DEPTH = Math.min(8,
52 SystemPropertyUtil.getInt(PROPERTY_MAX_LISTENER_STACK_DEPTH, 8));
53 @SuppressWarnings("rawtypes")
54 private static final AtomicReferenceFieldUpdater<DefaultPromise, Object> RESULT_UPDATER =
55 AtomicReferenceFieldUpdater.newUpdater(DefaultPromise.class, Object.class, "result");
56 private static final Object SUCCESS = new Object();
57 private static final Object UNCANCELLABLE = new Object();
58 private static final CauseHolder CANCELLATION_CAUSE_HOLDER = new CauseHolder(
59 StacklessCancellationException.newInstance(DefaultPromise.class, "cancel(...)"));
60 private static final StackTraceElement[] CANCELLATION_STACK = CANCELLATION_CAUSE_HOLDER.cause.getStackTrace();
61
62 private volatile Object result;
63 private final EventExecutor executor;
64
65
66
67
68
69
70
71 private GenericFutureListener<? extends Future<?>> listener;
72 private DefaultFutureListeners listeners;
73
74
75
76 private short waiters;
77
78
79
80
81
82 private boolean notifyingListeners;
83
84
85
86
87
88
89
90
91
92
93
94
95
96 public DefaultPromise(EventExecutor executor) {
97 this.executor = checkNotNull(executor, "executor");
98 }
99
100
101
102
103 protected DefaultPromise() {
104
105 executor = null;
106 }
107
108 @Override
109 public Promise<V> setSuccess(V result) {
110 if (setSuccess0(result)) {
111 return this;
112 }
113 throw new IllegalStateException("complete already: " + this);
114 }
115
116 @Override
117 public boolean trySuccess(V result) {
118 return setSuccess0(result);
119 }
120
121 @Override
122 public Promise<V> setFailure(Throwable cause) {
123 if (setFailure0(cause)) {
124 return this;
125 }
126 throw new IllegalStateException("complete already: " + this, cause);
127 }
128
129 @Override
130 public boolean tryFailure(Throwable cause) {
131 return setFailure0(cause);
132 }
133
134 @Override
135 public boolean setUncancellable() {
136 if (RESULT_UPDATER.compareAndSet(this, null, UNCANCELLABLE)) {
137 return true;
138 }
139 Object result = this.result;
140 return !isDone0(result) || !isCancelled0(result);
141 }
142
143 @Override
144 public boolean isSuccess() {
145 Object result = this.result;
146 return result != null && result != UNCANCELLABLE && !(result instanceof CauseHolder);
147 }
148
149 @Override
150 public boolean isCancellable() {
151 return result == null;
152 }
153
154 private static final class LeanCancellationException extends CancellationException {
155 private static final long serialVersionUID = 2794674970981187807L;
156
157
158 @Override
159 public Throwable fillInStackTrace() {
160 setStackTrace(CANCELLATION_STACK);
161 return this;
162 }
163
164 @Override
165 public String toString() {
166 return CancellationException.class.getName();
167 }
168 }
169
170 @Override
171 public Throwable cause() {
172 return cause0(result);
173 }
174
175 private Throwable cause0(Object result) {
176 if (!(result instanceof CauseHolder)) {
177 return null;
178 }
179 if (result == CANCELLATION_CAUSE_HOLDER) {
180 CancellationException ce = new LeanCancellationException();
181 if (RESULT_UPDATER.compareAndSet(this, CANCELLATION_CAUSE_HOLDER, new CauseHolder(ce))) {
182 return ce;
183 }
184 result = this.result;
185 }
186 return ((CauseHolder) result).cause;
187 }
188
189 @Override
190 public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
191 checkNotNull(listener, "listener");
192
193 synchronized (this) {
194 addListener0(listener);
195 }
196
197 if (isDone()) {
198 notifyListeners();
199 }
200
201 return this;
202 }
203
204 @Override
205 public Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners) {
206 checkNotNull(listeners, "listeners");
207
208 synchronized (this) {
209 for (GenericFutureListener<? extends Future<? super V>> listener : listeners) {
210 if (listener == null) {
211 break;
212 }
213 addListener0(listener);
214 }
215 }
216
217 if (isDone()) {
218 notifyListeners();
219 }
220
221 return this;
222 }
223
224 @Override
225 public Promise<V> removeListener(final GenericFutureListener<? extends Future<? super V>> listener) {
226 checkNotNull(listener, "listener");
227
228 synchronized (this) {
229 removeListener0(listener);
230 }
231
232 return this;
233 }
234
235 @Override
236 public Promise<V> removeListeners(final GenericFutureListener<? extends Future<? super V>>... listeners) {
237 checkNotNull(listeners, "listeners");
238
239 synchronized (this) {
240 for (GenericFutureListener<? extends Future<? super V>> listener : listeners) {
241 if (listener == null) {
242 break;
243 }
244 removeListener0(listener);
245 }
246 }
247
248 return this;
249 }
250
251 @Override
252 public Promise<V> await() throws InterruptedException {
253 if (isDone()) {
254 return this;
255 }
256
257 if (Thread.interrupted()) {
258 throw new InterruptedException(toString());
259 }
260
261 checkDeadLock();
262
263 synchronized (this) {
264 while (!isDone()) {
265 incWaiters();
266 try {
267 wait();
268 } finally {
269 decWaiters();
270 }
271 }
272 }
273 return this;
274 }
275
276 @Override
277 public Promise<V> awaitUninterruptibly() {
278 if (isDone()) {
279 return this;
280 }
281
282 checkDeadLock();
283
284 boolean interrupted = false;
285 synchronized (this) {
286 while (!isDone()) {
287 incWaiters();
288 try {
289 wait();
290 } catch (InterruptedException e) {
291
292 interrupted = true;
293 } finally {
294 decWaiters();
295 }
296 }
297 }
298
299 if (interrupted) {
300 Thread.currentThread().interrupt();
301 }
302
303 return this;
304 }
305
306 @Override
307 public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
308 return await0(unit.toNanos(timeout), true);
309 }
310
311 @Override
312 public boolean await(long timeoutMillis) throws InterruptedException {
313 return await0(MILLISECONDS.toNanos(timeoutMillis), true);
314 }
315
316 @Override
317 public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {
318 try {
319 return await0(unit.toNanos(timeout), false);
320 } catch (InterruptedException e) {
321
322 throw new InternalError();
323 }
324 }
325
326 @Override
327 public boolean awaitUninterruptibly(long timeoutMillis) {
328 try {
329 return await0(MILLISECONDS.toNanos(timeoutMillis), false);
330 } catch (InterruptedException e) {
331
332 throw new InternalError();
333 }
334 }
335
336 @SuppressWarnings("unchecked")
337 @Override
338 public V getNow() {
339 Object result = this.result;
340 if (result instanceof CauseHolder || result == SUCCESS || result == UNCANCELLABLE) {
341 return null;
342 }
343 return (V) result;
344 }
345
346 @SuppressWarnings("unchecked")
347 @Override
348 public V get() throws InterruptedException, ExecutionException {
349 Object result = this.result;
350 if (!isDone0(result)) {
351 await();
352 result = this.result;
353 }
354 if (result == SUCCESS || result == UNCANCELLABLE) {
355 return null;
356 }
357 Throwable cause = cause0(result);
358 if (cause == null) {
359 return (V) result;
360 }
361 if (cause instanceof CancellationException) {
362 throw (CancellationException) cause;
363 }
364 throw new ExecutionException(cause);
365 }
366
367 @SuppressWarnings("unchecked")
368 @Override
369 public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
370 Object result = this.result;
371 if (!isDone0(result)) {
372 if (!await(timeout, unit)) {
373 throw new TimeoutException();
374 }
375 result = this.result;
376 }
377 if (result == SUCCESS || result == UNCANCELLABLE) {
378 return null;
379 }
380 Throwable cause = cause0(result);
381 if (cause == null) {
382 return (V) result;
383 }
384 if (cause instanceof CancellationException) {
385 throw (CancellationException) cause;
386 }
387 throw new ExecutionException(cause);
388 }
389
390
391
392
393
394
395 @Override
396 public boolean cancel(boolean mayInterruptIfRunning) {
397 if (RESULT_UPDATER.compareAndSet(this, null, CANCELLATION_CAUSE_HOLDER)) {
398 if (checkNotifyWaiters()) {
399 notifyListeners();
400 }
401 return true;
402 }
403 return false;
404 }
405
406 @Override
407 public boolean isCancelled() {
408 return isCancelled0(result);
409 }
410
411 @Override
412 public boolean isDone() {
413 return isDone0(result);
414 }
415
416 @Override
417 public Promise<V> sync() throws InterruptedException {
418 await();
419 rethrowIfFailed();
420 return this;
421 }
422
423 @Override
424 public Promise<V> syncUninterruptibly() {
425 awaitUninterruptibly();
426 rethrowIfFailed();
427 return this;
428 }
429
430 @Override
431 public String toString() {
432 return toStringBuilder().toString();
433 }
434
435 protected StringBuilder toStringBuilder() {
436 StringBuilder buf = new StringBuilder(64)
437 .append(StringUtil.simpleClassName(this))
438 .append('@')
439 .append(Integer.toHexString(hashCode()));
440
441 Object result = this.result;
442 if (result == SUCCESS) {
443 buf.append("(success)");
444 } else if (result == UNCANCELLABLE) {
445 buf.append("(uncancellable)");
446 } else if (result instanceof CauseHolder) {
447 buf.append("(failure: ")
448 .append(((CauseHolder) result).cause)
449 .append(')');
450 } else if (result != null) {
451 buf.append("(success: ")
452 .append(result)
453 .append(')');
454 } else {
455 buf.append("(incomplete)");
456 }
457
458 return buf;
459 }
460
461
462
463
464
465
466
467
468
469 protected EventExecutor executor() {
470 return executor;
471 }
472
473 protected void checkDeadLock() {
474 EventExecutor e = executor();
475 if (e != null && e.inEventLoop()) {
476 throw new BlockingOperationException(toString());
477 }
478 }
479
480
481
482
483
484
485
486
487
488
489 protected static void notifyListener(
490 EventExecutor eventExecutor, final Future<?> future, final GenericFutureListener<?> listener) {
491 notifyListenerWithStackOverFlowProtection(
492 checkNotNull(eventExecutor, "eventExecutor"),
493 checkNotNull(future, "future"),
494 checkNotNull(listener, "listener"));
495 }
496
497 private void notifyListeners() {
498 EventExecutor executor = executor();
499 if (executor.inEventLoop()) {
500 final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
501 final int stackDepth = threadLocals.futureListenerStackDepth();
502 if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
503 threadLocals.setFutureListenerStackDepth(stackDepth + 1);
504 try {
505 notifyListenersNow();
506 } finally {
507 threadLocals.setFutureListenerStackDepth(stackDepth);
508 }
509 return;
510 }
511 }
512
513 safeExecute(executor, new Runnable() {
514 @Override
515 public void run() {
516 notifyListenersNow();
517 }
518 });
519 }
520
521
522
523
524
525
526 private static void notifyListenerWithStackOverFlowProtection(final EventExecutor executor,
527 final Future<?> future,
528 final GenericFutureListener<?> listener) {
529 if (executor.inEventLoop()) {
530 final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
531 final int stackDepth = threadLocals.futureListenerStackDepth();
532 if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
533 threadLocals.setFutureListenerStackDepth(stackDepth + 1);
534 try {
535 notifyListener0(future, listener);
536 } finally {
537 threadLocals.setFutureListenerStackDepth(stackDepth);
538 }
539 return;
540 }
541 }
542
543 safeExecute(executor, new Runnable() {
544 @Override
545 public void run() {
546 notifyListener0(future, listener);
547 }
548 });
549 }
550
551 private void notifyListenersNow() {
552 GenericFutureListener listener;
553 DefaultFutureListeners listeners;
554 synchronized (this) {
555 listener = this.listener;
556 listeners = this.listeners;
557
558 if (notifyingListeners || (listener == null && listeners == null)) {
559 return;
560 }
561 notifyingListeners = true;
562 if (listener != null) {
563 this.listener = null;
564 } else {
565 this.listeners = null;
566 }
567 }
568 for (;;) {
569 if (listener != null) {
570 notifyListener0(this, listener);
571 } else {
572 notifyListeners0(listeners);
573 }
574 synchronized (this) {
575 if (this.listener == null && this.listeners == null) {
576
577
578 notifyingListeners = false;
579 return;
580 }
581 listener = this.listener;
582 listeners = this.listeners;
583 if (listener != null) {
584 this.listener = null;
585 } else {
586 this.listeners = null;
587 }
588 }
589 }
590 }
591
592 private void notifyListeners0(DefaultFutureListeners listeners) {
593 GenericFutureListener<?>[] a = listeners.listeners();
594 int size = listeners.size();
595 for (int i = 0; i < size; i ++) {
596 notifyListener0(this, a[i]);
597 }
598 }
599
600 @SuppressWarnings({ "unchecked", "rawtypes" })
601 private static void notifyListener0(Future future, GenericFutureListener l) {
602 try {
603 l.operationComplete(future);
604 } catch (Throwable t) {
605 if (logger.isWarnEnabled()) {
606 logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t);
607 }
608 }
609 }
610
611 private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) {
612 if (this.listener == null) {
613 if (listeners == null) {
614 this.listener = listener;
615 } else {
616 listeners.add(listener);
617 }
618 } else {
619 assert listeners == null;
620 listeners = new DefaultFutureListeners(this.listener, listener);
621 this.listener = null;
622 }
623 }
624
625 private void removeListener0(GenericFutureListener<? extends Future<? super V>> toRemove) {
626 if (listener == toRemove) {
627 listener = null;
628 } else if (listeners != null) {
629 listeners.remove(toRemove);
630
631 if (listeners.size() == 0) {
632 listeners = null;
633 }
634 }
635 }
636
637 private boolean setSuccess0(V result) {
638 return setValue0(result == null ? SUCCESS : result);
639 }
640
641 private boolean setFailure0(Throwable cause) {
642 return setValue0(new CauseHolder(checkNotNull(cause, "cause")));
643 }
644
645 private boolean setValue0(Object objResult) {
646 if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
647 RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
648 if (checkNotifyWaiters()) {
649 notifyListeners();
650 }
651 return true;
652 }
653 return false;
654 }
655
656
657
658
659
660 private synchronized boolean checkNotifyWaiters() {
661 if (waiters > 0) {
662 notifyAll();
663 }
664 return listener != null || listeners != null;
665 }
666
667 private void incWaiters() {
668 if (waiters == Short.MAX_VALUE) {
669 throw new IllegalStateException("too many waiters: " + this);
670 }
671 ++waiters;
672 }
673
674 private void decWaiters() {
675 --waiters;
676 }
677
678 private void rethrowIfFailed() {
679 Throwable cause = cause();
680 if (cause == null) {
681 return;
682 }
683
684 if (!(cause instanceof CancellationException) && cause.getSuppressed().length == 0) {
685 cause.addSuppressed(new CompletionException("Rethrowing promise failure cause", null));
686 }
687 PlatformDependent.throwException(cause);
688 }
689
690 private boolean await0(long timeoutNanos, boolean interruptable) throws InterruptedException {
691 if (isDone()) {
692 return true;
693 }
694
695 if (timeoutNanos <= 0) {
696 return isDone();
697 }
698
699 if (interruptable && Thread.interrupted()) {
700 throw new InterruptedException(toString());
701 }
702
703 checkDeadLock();
704
705
706
707 final long startTime = System.nanoTime();
708 synchronized (this) {
709 boolean interrupted = false;
710 try {
711 long waitTime = timeoutNanos;
712 while (!isDone() && waitTime > 0) {
713 incWaiters();
714 try {
715 wait(waitTime / 1000000, (int) (waitTime % 1000000));
716 } catch (InterruptedException e) {
717 if (interruptable) {
718 throw e;
719 } else {
720 interrupted = true;
721 }
722 } finally {
723 decWaiters();
724 }
725
726 if (isDone()) {
727 return true;
728 }
729
730
731 waitTime = timeoutNanos - (System.nanoTime() - startTime);
732 }
733 return isDone();
734 } finally {
735 if (interrupted) {
736 Thread.currentThread().interrupt();
737 }
738 }
739 }
740 }
741
742
743
744
745
746
747
748
749
750
751
752 @SuppressWarnings("unchecked")
753 void notifyProgressiveListeners(final long progress, final long total) {
754 final Object listeners = progressiveListeners();
755 if (listeners == null) {
756 return;
757 }
758
759 final ProgressiveFuture<V> self = (ProgressiveFuture<V>) this;
760
761 EventExecutor executor = executor();
762 if (executor.inEventLoop()) {
763 if (listeners instanceof GenericProgressiveFutureListener[]) {
764 notifyProgressiveListeners0(
765 self, (GenericProgressiveFutureListener<?>[]) listeners, progress, total);
766 } else {
767 notifyProgressiveListener0(
768 self, (GenericProgressiveFutureListener<ProgressiveFuture<V>>) listeners, progress, total);
769 }
770 } else {
771 if (listeners instanceof GenericProgressiveFutureListener[]) {
772 final GenericProgressiveFutureListener<?>[] array =
773 (GenericProgressiveFutureListener<?>[]) listeners;
774 safeExecute(executor, new Runnable() {
775 @Override
776 public void run() {
777 notifyProgressiveListeners0(self, array, progress, total);
778 }
779 });
780 } else {
781 final GenericProgressiveFutureListener<ProgressiveFuture<V>> l =
782 (GenericProgressiveFutureListener<ProgressiveFuture<V>>) listeners;
783 safeExecute(executor, new Runnable() {
784 @Override
785 public void run() {
786 notifyProgressiveListener0(self, l, progress, total);
787 }
788 });
789 }
790 }
791 }
792
793
794
795
796
797 private synchronized Object progressiveListeners() {
798 final GenericFutureListener listener = this.listener;
799 final DefaultFutureListeners listeners = this.listeners;
800 if (listener == null && listeners == null) {
801
802 return null;
803 }
804
805 if (listeners != null) {
806
807 DefaultFutureListeners dfl = listeners;
808 int progressiveSize = dfl.progressiveSize();
809 switch (progressiveSize) {
810 case 0:
811 return null;
812 case 1:
813 for (GenericFutureListener<?> l: dfl.listeners()) {
814 if (l instanceof GenericProgressiveFutureListener) {
815 return l;
816 }
817 }
818 return null;
819 }
820
821 GenericFutureListener<?>[] array = dfl.listeners();
822 GenericProgressiveFutureListener<?>[] copy = new GenericProgressiveFutureListener[progressiveSize];
823 for (int i = 0, j = 0; j < progressiveSize; i ++) {
824 GenericFutureListener<?> l = array[i];
825 if (l instanceof GenericProgressiveFutureListener) {
826 copy[j ++] = (GenericProgressiveFutureListener<?>) l;
827 }
828 }
829
830 return copy;
831 } else if (listener instanceof GenericProgressiveFutureListener) {
832 return listener;
833 } else {
834
835 return null;
836 }
837 }
838
839 private static void notifyProgressiveListeners0(
840 ProgressiveFuture<?> future, GenericProgressiveFutureListener<?>[] listeners, long progress, long total) {
841 for (GenericProgressiveFutureListener<?> l: listeners) {
842 if (l == null) {
843 break;
844 }
845 notifyProgressiveListener0(future, l, progress, total);
846 }
847 }
848
849 @SuppressWarnings({ "unchecked", "rawtypes" })
850 private static void notifyProgressiveListener0(
851 ProgressiveFuture future, GenericProgressiveFutureListener l, long progress, long total) {
852 try {
853 l.operationProgressed(future, progress, total);
854 } catch (Throwable t) {
855 if (logger.isWarnEnabled()) {
856 logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationProgressed()", t);
857 }
858 }
859 }
860
861 private static boolean isCancelled0(Object result) {
862 return result instanceof CauseHolder && ((CauseHolder) result).cause instanceof CancellationException;
863 }
864
865 private static boolean isDone0(Object result) {
866 return result != null && result != UNCANCELLABLE;
867 }
868
869 private static final class CauseHolder {
870 final Throwable cause;
871 CauseHolder(Throwable cause) {
872 this.cause = cause;
873 }
874 }
875
876 private static void safeExecute(EventExecutor executor, Runnable task) {
877 try {
878 executor.execute(task);
879 } catch (Throwable t) {
880 rejectedExecutionLogger.error("Failed to submit a listener notification task. Event loop shut down?", t);
881 }
882 }
883
884 private static final class StacklessCancellationException extends CancellationException {
885
886 private static final long serialVersionUID = -2974906711413716191L;
887
888 private StacklessCancellationException() { }
889
890
891
892 @Override
893 public Throwable fillInStackTrace() {
894 return this;
895 }
896
897 static StacklessCancellationException newInstance(Class<?> clazz, String method) {
898 return ThrowableUtil.unknownStackTrace(new StacklessCancellationException(), clazz, method);
899 }
900 }
901 }