1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.util.concurrent;
17
18 import io.netty.util.internal.InternalThreadLocalMap;
19 import io.netty.util.internal.PlatformDependent;
20 import io.netty.util.internal.StringUtil;
21 import io.netty.util.internal.SystemPropertyUtil;
22 import io.netty.util.internal.ThrowableUtil;
23 import io.netty.util.internal.logging.InternalLogger;
24 import io.netty.util.internal.logging.InternalLoggerFactory;
25
26 import java.util.concurrent.CancellationException;
27 import java.util.concurrent.ExecutionException;
28 import java.util.concurrent.TimeUnit;
29 import java.util.concurrent.TimeoutException;
30 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
31
32 import static io.netty.util.internal.ObjectUtil.checkNotNull;
33 import static java.util.concurrent.TimeUnit.MILLISECONDS;
34
35 public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
36 private static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultPromise.class);
37 private static final InternalLogger rejectedExecutionLogger =
38 InternalLoggerFactory.getInstance(DefaultPromise.class.getName() + ".rejectedExecution");
39 private static final int MAX_LISTENER_STACK_DEPTH = Math.min(8,
40 SystemPropertyUtil.getInt("io.netty.defaultPromise.maxListenerStackDepth", 8));
41 @SuppressWarnings("rawtypes")
42 private static final AtomicReferenceFieldUpdater<DefaultPromise, Object> RESULT_UPDATER =
43 AtomicReferenceFieldUpdater.newUpdater(DefaultPromise.class, Object.class, "result");
44 private static final Object SUCCESS = new Object();
45 private static final Object UNCANCELLABLE = new Object();
46 private static final CauseHolder CANCELLATION_CAUSE_HOLDER = new CauseHolder(
47 StacklessCancellationException.newInstance(DefaultPromise.class, "cancel(...)"));
48 private static final StackTraceElement[] CANCELLATION_STACK = CANCELLATION_CAUSE_HOLDER.cause.getStackTrace();
49
50 private volatile Object result;
51 private final EventExecutor executor;
52
53
54
55
56
57
58 private Object listeners;
59
60
61
62 private short waiters;
63
64
65
66
67
68 private boolean notifyingListeners;
69
70
71
72
73
74
75
76
77
78
79
80
81
82 public DefaultPromise(EventExecutor executor) {
83 this.executor = checkNotNull(executor, "executor");
84 }
85
86
87
88
89 protected DefaultPromise() {
90
91 executor = null;
92 }
93
94 @Override
95 public Promise<V> setSuccess(V result) {
96 if (setSuccess0(result)) {
97 return this;
98 }
99 throw new IllegalStateException("complete already: " + this);
100 }
101
102 @Override
103 public boolean trySuccess(V result) {
104 return setSuccess0(result);
105 }
106
107 @Override
108 public Promise<V> setFailure(Throwable cause) {
109 if (setFailure0(cause)) {
110 return this;
111 }
112 throw new IllegalStateException("complete already: " + this, cause);
113 }
114
115 @Override
116 public boolean tryFailure(Throwable cause) {
117 return setFailure0(cause);
118 }
119
120 @Override
121 public boolean setUncancellable() {
122 if (RESULT_UPDATER.compareAndSet(this, null, UNCANCELLABLE)) {
123 return true;
124 }
125 Object result = this.result;
126 return !isDone0(result) || !isCancelled0(result);
127 }
128
129 @Override
130 public boolean isSuccess() {
131 Object result = this.result;
132 return result != null && result != UNCANCELLABLE && !(result instanceof CauseHolder);
133 }
134
135 @Override
136 public boolean isCancellable() {
137 return result == null;
138 }
139
140 private static final class LeanCancellationException extends CancellationException {
141 private static final long serialVersionUID = 2794674970981187807L;
142
143
144 @Override
145 public Throwable fillInStackTrace() {
146 setStackTrace(CANCELLATION_STACK);
147 return this;
148 }
149
150 @Override
151 public String toString() {
152 return CancellationException.class.getName();
153 }
154 }
155
156 @Override
157 public Throwable cause() {
158 return cause0(result);
159 }
160
161 private Throwable cause0(Object result) {
162 if (!(result instanceof CauseHolder)) {
163 return null;
164 }
165 if (result == CANCELLATION_CAUSE_HOLDER) {
166 CancellationException ce = new LeanCancellationException();
167 if (RESULT_UPDATER.compareAndSet(this, CANCELLATION_CAUSE_HOLDER, new CauseHolder(ce))) {
168 return ce;
169 }
170 result = this.result;
171 }
172 return ((CauseHolder) result).cause;
173 }
174
175 @Override
176 public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
177 checkNotNull(listener, "listener");
178
179 synchronized (this) {
180 addListener0(listener);
181 }
182
183 if (isDone()) {
184 notifyListeners();
185 }
186
187 return this;
188 }
189
190 @Override
191 public Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners) {
192 checkNotNull(listeners, "listeners");
193
194 synchronized (this) {
195 for (GenericFutureListener<? extends Future<? super V>> listener : listeners) {
196 if (listener == null) {
197 break;
198 }
199 addListener0(listener);
200 }
201 }
202
203 if (isDone()) {
204 notifyListeners();
205 }
206
207 return this;
208 }
209
210 @Override
211 public Promise<V> removeListener(final GenericFutureListener<? extends Future<? super V>> listener) {
212 checkNotNull(listener, "listener");
213
214 synchronized (this) {
215 removeListener0(listener);
216 }
217
218 return this;
219 }
220
221 @Override
222 public Promise<V> removeListeners(final GenericFutureListener<? extends Future<? super V>>... listeners) {
223 checkNotNull(listeners, "listeners");
224
225 synchronized (this) {
226 for (GenericFutureListener<? extends Future<? super V>> listener : listeners) {
227 if (listener == null) {
228 break;
229 }
230 removeListener0(listener);
231 }
232 }
233
234 return this;
235 }
236
237 @Override
238 public Promise<V> await() throws InterruptedException {
239 if (isDone()) {
240 return this;
241 }
242
243 if (Thread.interrupted()) {
244 throw new InterruptedException(toString());
245 }
246
247 checkDeadLock();
248
249 synchronized (this) {
250 while (!isDone()) {
251 incWaiters();
252 try {
253 wait();
254 } finally {
255 decWaiters();
256 }
257 }
258 }
259 return this;
260 }
261
262 @Override
263 public Promise<V> awaitUninterruptibly() {
264 if (isDone()) {
265 return this;
266 }
267
268 checkDeadLock();
269
270 boolean interrupted = false;
271 synchronized (this) {
272 while (!isDone()) {
273 incWaiters();
274 try {
275 wait();
276 } catch (InterruptedException e) {
277
278 interrupted = true;
279 } finally {
280 decWaiters();
281 }
282 }
283 }
284
285 if (interrupted) {
286 Thread.currentThread().interrupt();
287 }
288
289 return this;
290 }
291
292 @Override
293 public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
294 return await0(unit.toNanos(timeout), true);
295 }
296
297 @Override
298 public boolean await(long timeoutMillis) throws InterruptedException {
299 return await0(MILLISECONDS.toNanos(timeoutMillis), true);
300 }
301
302 @Override
303 public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {
304 try {
305 return await0(unit.toNanos(timeout), false);
306 } catch (InterruptedException e) {
307
308 throw new InternalError();
309 }
310 }
311
312 @Override
313 public boolean awaitUninterruptibly(long timeoutMillis) {
314 try {
315 return await0(MILLISECONDS.toNanos(timeoutMillis), false);
316 } catch (InterruptedException e) {
317
318 throw new InternalError();
319 }
320 }
321
322 @SuppressWarnings("unchecked")
323 @Override
324 public V getNow() {
325 Object result = this.result;
326 if (result instanceof CauseHolder || result == SUCCESS || result == UNCANCELLABLE) {
327 return null;
328 }
329 return (V) result;
330 }
331
332 @SuppressWarnings("unchecked")
333 @Override
334 public V get() throws InterruptedException, ExecutionException {
335 Object result = this.result;
336 if (!isDone0(result)) {
337 await();
338 result = this.result;
339 }
340 if (result == SUCCESS || result == UNCANCELLABLE) {
341 return null;
342 }
343 Throwable cause = cause0(result);
344 if (cause == null) {
345 return (V) result;
346 }
347 if (cause instanceof CancellationException) {
348 throw (CancellationException) cause;
349 }
350 throw new ExecutionException(cause);
351 }
352
353 @SuppressWarnings("unchecked")
354 @Override
355 public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
356 Object result = this.result;
357 if (!isDone0(result)) {
358 if (!await(timeout, unit)) {
359 throw new TimeoutException();
360 }
361 result = this.result;
362 }
363 if (result == SUCCESS || result == UNCANCELLABLE) {
364 return null;
365 }
366 Throwable cause = cause0(result);
367 if (cause == null) {
368 return (V) result;
369 }
370 if (cause instanceof CancellationException) {
371 throw (CancellationException) cause;
372 }
373 throw new ExecutionException(cause);
374 }
375
376
377
378
379
380
381 @Override
382 public boolean cancel(boolean mayInterruptIfRunning) {
383 if (RESULT_UPDATER.compareAndSet(this, null, CANCELLATION_CAUSE_HOLDER)) {
384 if (checkNotifyWaiters()) {
385 notifyListeners();
386 }
387 return true;
388 }
389 return false;
390 }
391
392 @Override
393 public boolean isCancelled() {
394 return isCancelled0(result);
395 }
396
397 @Override
398 public boolean isDone() {
399 return isDone0(result);
400 }
401
402 @Override
403 public Promise<V> sync() throws InterruptedException {
404 await();
405 rethrowIfFailed();
406 return this;
407 }
408
409 @Override
410 public Promise<V> syncUninterruptibly() {
411 awaitUninterruptibly();
412 rethrowIfFailed();
413 return this;
414 }
415
416 @Override
417 public String toString() {
418 return toStringBuilder().toString();
419 }
420
421 protected StringBuilder toStringBuilder() {
422 StringBuilder buf = new StringBuilder(64)
423 .append(StringUtil.simpleClassName(this))
424 .append('@')
425 .append(Integer.toHexString(hashCode()));
426
427 Object result = this.result;
428 if (result == SUCCESS) {
429 buf.append("(success)");
430 } else if (result == UNCANCELLABLE) {
431 buf.append("(uncancellable)");
432 } else if (result instanceof CauseHolder) {
433 buf.append("(failure: ")
434 .append(((CauseHolder) result).cause)
435 .append(')');
436 } else if (result != null) {
437 buf.append("(success: ")
438 .append(result)
439 .append(')');
440 } else {
441 buf.append("(incomplete)");
442 }
443
444 return buf;
445 }
446
447
448
449
450
451
452
453
454
455 protected EventExecutor executor() {
456 return executor;
457 }
458
459 protected void checkDeadLock() {
460 EventExecutor e = executor();
461 if (e != null && e.inEventLoop()) {
462 throw new BlockingOperationException(toString());
463 }
464 }
465
466
467
468
469
470
471
472
473
474
475 protected static void notifyListener(
476 EventExecutor eventExecutor, final Future<?> future, final GenericFutureListener<?> listener) {
477 notifyListenerWithStackOverFlowProtection(
478 checkNotNull(eventExecutor, "eventExecutor"),
479 checkNotNull(future, "future"),
480 checkNotNull(listener, "listener"));
481 }
482
483 private void notifyListeners() {
484 EventExecutor executor = executor();
485 if (executor.inEventLoop()) {
486 final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
487 final int stackDepth = threadLocals.futureListenerStackDepth();
488 if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
489 threadLocals.setFutureListenerStackDepth(stackDepth + 1);
490 try {
491 notifyListenersNow();
492 } finally {
493 threadLocals.setFutureListenerStackDepth(stackDepth);
494 }
495 return;
496 }
497 }
498
499 safeExecute(executor, new Runnable() {
500 @Override
501 public void run() {
502 notifyListenersNow();
503 }
504 });
505 }
506
507
508
509
510
511
512 private static void notifyListenerWithStackOverFlowProtection(final EventExecutor executor,
513 final Future<?> future,
514 final GenericFutureListener<?> listener) {
515 if (executor.inEventLoop()) {
516 final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
517 final int stackDepth = threadLocals.futureListenerStackDepth();
518 if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
519 threadLocals.setFutureListenerStackDepth(stackDepth + 1);
520 try {
521 notifyListener0(future, listener);
522 } finally {
523 threadLocals.setFutureListenerStackDepth(stackDepth);
524 }
525 return;
526 }
527 }
528
529 safeExecute(executor, new Runnable() {
530 @Override
531 public void run() {
532 notifyListener0(future, listener);
533 }
534 });
535 }
536
537 private void notifyListenersNow() {
538 Object listeners;
539 synchronized (this) {
540
541 if (notifyingListeners || this.listeners == null) {
542 return;
543 }
544 notifyingListeners = true;
545 listeners = this.listeners;
546 this.listeners = null;
547 }
548 for (;;) {
549 if (listeners instanceof DefaultFutureListeners) {
550 notifyListeners0((DefaultFutureListeners) listeners);
551 } else {
552 notifyListener0(this, (GenericFutureListener<?>) listeners);
553 }
554 synchronized (this) {
555 if (this.listeners == null) {
556
557
558 notifyingListeners = false;
559 return;
560 }
561 listeners = this.listeners;
562 this.listeners = null;
563 }
564 }
565 }
566
567 private void notifyListeners0(DefaultFutureListeners listeners) {
568 GenericFutureListener<?>[] a = listeners.listeners();
569 int size = listeners.size();
570 for (int i = 0; i < size; i ++) {
571 notifyListener0(this, a[i]);
572 }
573 }
574
575 @SuppressWarnings({ "unchecked", "rawtypes" })
576 private static void notifyListener0(Future future, GenericFutureListener l) {
577 try {
578 l.operationComplete(future);
579 } catch (Throwable t) {
580 if (logger.isWarnEnabled()) {
581 logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t);
582 }
583 }
584 }
585
586 private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) {
587 if (listeners == null) {
588 listeners = listener;
589 } else if (listeners instanceof DefaultFutureListeners) {
590 ((DefaultFutureListeners) listeners).add(listener);
591 } else {
592 listeners = new DefaultFutureListeners((GenericFutureListener<?>) listeners, listener);
593 }
594 }
595
596 private void removeListener0(GenericFutureListener<? extends Future<? super V>> listener) {
597 if (listeners instanceof DefaultFutureListeners) {
598 ((DefaultFutureListeners) listeners).remove(listener);
599 } else if (listeners == listener) {
600 listeners = null;
601 }
602 }
603
604 private boolean setSuccess0(V result) {
605 return setValue0(result == null ? SUCCESS : result);
606 }
607
608 private boolean setFailure0(Throwable cause) {
609 return setValue0(new CauseHolder(checkNotNull(cause, "cause")));
610 }
611
612 private boolean setValue0(Object objResult) {
613 if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
614 RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
615 if (checkNotifyWaiters()) {
616 notifyListeners();
617 }
618 return true;
619 }
620 return false;
621 }
622
623
624
625
626
627 private synchronized boolean checkNotifyWaiters() {
628 if (waiters > 0) {
629 notifyAll();
630 }
631 return listeners != null;
632 }
633
634 private void incWaiters() {
635 if (waiters == Short.MAX_VALUE) {
636 throw new IllegalStateException("too many waiters: " + this);
637 }
638 ++waiters;
639 }
640
641 private void decWaiters() {
642 --waiters;
643 }
644
645 private void rethrowIfFailed() {
646 Throwable cause = cause();
647 if (cause == null) {
648 return;
649 }
650
651 PlatformDependent.throwException(cause);
652 }
653
654 private boolean await0(long timeoutNanos, boolean interruptable) throws InterruptedException {
655 if (isDone()) {
656 return true;
657 }
658
659 if (timeoutNanos <= 0) {
660 return isDone();
661 }
662
663 if (interruptable && Thread.interrupted()) {
664 throw new InterruptedException(toString());
665 }
666
667 checkDeadLock();
668
669
670
671 final long startTime = System.nanoTime();
672 synchronized (this) {
673 boolean interrupted = false;
674 try {
675 long waitTime = timeoutNanos;
676 while (!isDone() && waitTime > 0) {
677 incWaiters();
678 try {
679 wait(waitTime / 1000000, (int) (waitTime % 1000000));
680 } catch (InterruptedException e) {
681 if (interruptable) {
682 throw e;
683 } else {
684 interrupted = true;
685 }
686 } finally {
687 decWaiters();
688 }
689
690 if (isDone()) {
691 return true;
692 }
693
694
695 waitTime = timeoutNanos - (System.nanoTime() - startTime);
696 }
697 return isDone();
698 } finally {
699 if (interrupted) {
700 Thread.currentThread().interrupt();
701 }
702 }
703 }
704 }
705
706
707
708
709
710
711
712
713
714
715
716 @SuppressWarnings("unchecked")
717 void notifyProgressiveListeners(final long progress, final long total) {
718 final Object listeners = progressiveListeners();
719 if (listeners == null) {
720 return;
721 }
722
723 final ProgressiveFuture<V> self = (ProgressiveFuture<V>) this;
724
725 EventExecutor executor = executor();
726 if (executor.inEventLoop()) {
727 if (listeners instanceof GenericProgressiveFutureListener[]) {
728 notifyProgressiveListeners0(
729 self, (GenericProgressiveFutureListener<?>[]) listeners, progress, total);
730 } else {
731 notifyProgressiveListener0(
732 self, (GenericProgressiveFutureListener<ProgressiveFuture<V>>) listeners, progress, total);
733 }
734 } else {
735 if (listeners instanceof GenericProgressiveFutureListener[]) {
736 final GenericProgressiveFutureListener<?>[] array =
737 (GenericProgressiveFutureListener<?>[]) listeners;
738 safeExecute(executor, new Runnable() {
739 @Override
740 public void run() {
741 notifyProgressiveListeners0(self, array, progress, total);
742 }
743 });
744 } else {
745 final GenericProgressiveFutureListener<ProgressiveFuture<V>> l =
746 (GenericProgressiveFutureListener<ProgressiveFuture<V>>) listeners;
747 safeExecute(executor, new Runnable() {
748 @Override
749 public void run() {
750 notifyProgressiveListener0(self, l, progress, total);
751 }
752 });
753 }
754 }
755 }
756
757
758
759
760
761 private synchronized Object progressiveListeners() {
762 Object listeners = this.listeners;
763 if (listeners == null) {
764
765 return null;
766 }
767
768 if (listeners instanceof DefaultFutureListeners) {
769
770 DefaultFutureListeners dfl = (DefaultFutureListeners) listeners;
771 int progressiveSize = dfl.progressiveSize();
772 switch (progressiveSize) {
773 case 0:
774 return null;
775 case 1:
776 for (GenericFutureListener<?> l: dfl.listeners()) {
777 if (l instanceof GenericProgressiveFutureListener) {
778 return l;
779 }
780 }
781 return null;
782 }
783
784 GenericFutureListener<?>[] array = dfl.listeners();
785 GenericProgressiveFutureListener<?>[] copy = new GenericProgressiveFutureListener[progressiveSize];
786 for (int i = 0, j = 0; j < progressiveSize; i ++) {
787 GenericFutureListener<?> l = array[i];
788 if (l instanceof GenericProgressiveFutureListener) {
789 copy[j ++] = (GenericProgressiveFutureListener<?>) l;
790 }
791 }
792
793 return copy;
794 } else if (listeners instanceof GenericProgressiveFutureListener) {
795 return listeners;
796 } else {
797
798 return null;
799 }
800 }
801
802 private static void notifyProgressiveListeners0(
803 ProgressiveFuture<?> future, GenericProgressiveFutureListener<?>[] listeners, long progress, long total) {
804 for (GenericProgressiveFutureListener<?> l: listeners) {
805 if (l == null) {
806 break;
807 }
808 notifyProgressiveListener0(future, l, progress, total);
809 }
810 }
811
812 @SuppressWarnings({ "unchecked", "rawtypes" })
813 private static void notifyProgressiveListener0(
814 ProgressiveFuture future, GenericProgressiveFutureListener l, long progress, long total) {
815 try {
816 l.operationProgressed(future, progress, total);
817 } catch (Throwable t) {
818 if (logger.isWarnEnabled()) {
819 logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationProgressed()", t);
820 }
821 }
822 }
823
824 private static boolean isCancelled0(Object result) {
825 return result instanceof CauseHolder && ((CauseHolder) result).cause instanceof CancellationException;
826 }
827
828 private static boolean isDone0(Object result) {
829 return result != null && result != UNCANCELLABLE;
830 }
831
832 private static final class CauseHolder {
833 final Throwable cause;
834 CauseHolder(Throwable cause) {
835 this.cause = cause;
836 }
837 }
838
839 private static void safeExecute(EventExecutor executor, Runnable task) {
840 try {
841 executor.execute(task);
842 } catch (Throwable t) {
843 rejectedExecutionLogger.error("Failed to submit a listener notification task. Event loop shut down?", t);
844 }
845 }
846
847 private static final class StacklessCancellationException extends CancellationException {
848
849 private static final long serialVersionUID = -2974906711413716191L;
850
851 private StacklessCancellationException() { }
852
853
854
855 @Override
856 public Throwable fillInStackTrace() {
857 return this;
858 }
859
860 static StacklessCancellationException newInstance(Class<?> clazz, String method) {
861 return ThrowableUtil.unknownStackTrace(new StacklessCancellationException(), clazz, method);
862 }
863 }
864 }