1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.util.concurrent;
17
18 import io.netty.util.Signal;
19 import io.netty.util.internal.InternalThreadLocalMap;
20 import io.netty.util.internal.PlatformDependent;
21 import io.netty.util.internal.StringUtil;
22 import io.netty.util.internal.SystemPropertyUtil;
23 import io.netty.util.internal.ThrowableUtil;
24 import io.netty.util.internal.logging.InternalLogger;
25 import io.netty.util.internal.logging.InternalLoggerFactory;
26
27 import java.util.concurrent.CancellationException;
28 import java.util.concurrent.TimeUnit;
29 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
30
31 import static io.netty.util.internal.ObjectUtil.checkNotNull;
32 import static java.util.concurrent.TimeUnit.MILLISECONDS;
33
34 public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
35 private static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultPromise.class);
36 private static final InternalLogger rejectedExecutionLogger =
37 InternalLoggerFactory.getInstance(DefaultPromise.class.getName() + ".rejectedExecution");
38 private static final int MAX_LISTENER_STACK_DEPTH = Math.min(8,
39 SystemPropertyUtil.getInt("io.netty.defaultPromise.maxListenerStackDepth", 8));
40 @SuppressWarnings("rawtypes")
41 private static final AtomicReferenceFieldUpdater<DefaultPromise, Object> RESULT_UPDATER =
42 AtomicReferenceFieldUpdater.newUpdater(DefaultPromise.class, Object.class, "result");
43 private static final Signal SUCCESS = Signal.valueOf(DefaultPromise.class.getName() + ".SUCCESS");
44 private static final Signal UNCANCELLABLE = Signal.valueOf(DefaultPromise.class.getName() + ".UNCANCELLABLE");
45 private static final CauseHolder CANCELLATION_CAUSE_HOLDER = new CauseHolder(ThrowableUtil.unknownStackTrace(
46 new CancellationException(), DefaultPromise.class, "cancel(...)"));
47
48 private volatile Object result;
49 private final EventExecutor executor;
50
51
52
53
54
55
56 private Object listeners;
57
58
59
60 private short waiters;
61
62
63
64
65
66 private boolean notifyingListeners;
67
68
69
70
71
72
73
74
75
76
77
78
79
80 public DefaultPromise(EventExecutor executor) {
81 this.executor = checkNotNull(executor, "executor");
82 }
83
84
85
86
87 protected DefaultPromise() {
88
89 executor = null;
90 }
91
92 @Override
93 public Promise<V> setSuccess(V result) {
94 if (setSuccess0(result)) {
95 notifyListeners();
96 return this;
97 }
98 throw new IllegalStateException("complete already: " + this);
99 }
100
101 @Override
102 public boolean trySuccess(V result) {
103 if (setSuccess0(result)) {
104 notifyListeners();
105 return true;
106 }
107 return false;
108 }
109
110 @Override
111 public Promise<V> setFailure(Throwable cause) {
112 if (setFailure0(cause)) {
113 notifyListeners();
114 return this;
115 }
116 throw new IllegalStateException("complete already: " + this, cause);
117 }
118
119 @Override
120 public boolean tryFailure(Throwable cause) {
121 if (setFailure0(cause)) {
122 notifyListeners();
123 return true;
124 }
125 return false;
126 }
127
128 @Override
129 public boolean setUncancellable() {
130 if (RESULT_UPDATER.compareAndSet(this, null, UNCANCELLABLE)) {
131 return true;
132 }
133 Object result = this.result;
134 return !isDone0(result) || !isCancelled0(result);
135 }
136
137 @Override
138 public boolean isSuccess() {
139 Object result = this.result;
140 return result != null && result != UNCANCELLABLE && !(result instanceof CauseHolder);
141 }
142
143 @Override
144 public boolean isCancellable() {
145 return result == null;
146 }
147
148 @Override
149 public Throwable cause() {
150 Object result = this.result;
151 return (result instanceof CauseHolder) ? ((CauseHolder) result).cause : null;
152 }
153
154 @Override
155 public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
156 checkNotNull(listener, "listener");
157
158 synchronized (this) {
159 addListener0(listener);
160 }
161
162 if (isDone()) {
163 notifyListeners();
164 }
165
166 return this;
167 }
168
169 @Override
170 public Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners) {
171 checkNotNull(listeners, "listeners");
172
173 synchronized (this) {
174 for (GenericFutureListener<? extends Future<? super V>> listener : listeners) {
175 if (listener == null) {
176 break;
177 }
178 addListener0(listener);
179 }
180 }
181
182 if (isDone()) {
183 notifyListeners();
184 }
185
186 return this;
187 }
188
189 @Override
190 public Promise<V> removeListener(final GenericFutureListener<? extends Future<? super V>> listener) {
191 checkNotNull(listener, "listener");
192
193 synchronized (this) {
194 removeListener0(listener);
195 }
196
197 return this;
198 }
199
200 @Override
201 public Promise<V> removeListeners(final GenericFutureListener<? extends Future<? super V>>... listeners) {
202 checkNotNull(listeners, "listeners");
203
204 synchronized (this) {
205 for (GenericFutureListener<? extends Future<? super V>> listener : listeners) {
206 if (listener == null) {
207 break;
208 }
209 removeListener0(listener);
210 }
211 }
212
213 return this;
214 }
215
216 @Override
217 public Promise<V> await() throws InterruptedException {
218 if (isDone()) {
219 return this;
220 }
221
222 if (Thread.interrupted()) {
223 throw new InterruptedException(toString());
224 }
225
226 checkDeadLock();
227
228 synchronized (this) {
229 while (!isDone()) {
230 incWaiters();
231 try {
232 wait();
233 } finally {
234 decWaiters();
235 }
236 }
237 }
238 return this;
239 }
240
241 @Override
242 public Promise<V> awaitUninterruptibly() {
243 if (isDone()) {
244 return this;
245 }
246
247 checkDeadLock();
248
249 boolean interrupted = false;
250 synchronized (this) {
251 while (!isDone()) {
252 incWaiters();
253 try {
254 wait();
255 } catch (InterruptedException e) {
256
257 interrupted = true;
258 } finally {
259 decWaiters();
260 }
261 }
262 }
263
264 if (interrupted) {
265 Thread.currentThread().interrupt();
266 }
267
268 return this;
269 }
270
271 @Override
272 public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
273 return await0(unit.toNanos(timeout), true);
274 }
275
276 @Override
277 public boolean await(long timeoutMillis) throws InterruptedException {
278 return await0(MILLISECONDS.toNanos(timeoutMillis), true);
279 }
280
281 @Override
282 public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {
283 try {
284 return await0(unit.toNanos(timeout), false);
285 } catch (InterruptedException e) {
286
287 throw new InternalError();
288 }
289 }
290
291 @Override
292 public boolean awaitUninterruptibly(long timeoutMillis) {
293 try {
294 return await0(MILLISECONDS.toNanos(timeoutMillis), false);
295 } catch (InterruptedException e) {
296
297 throw new InternalError();
298 }
299 }
300
301 @SuppressWarnings("unchecked")
302 @Override
303 public V getNow() {
304 Object result = this.result;
305 if (result instanceof CauseHolder || result == SUCCESS) {
306 return null;
307 }
308 return (V) result;
309 }
310
311
312
313
314
315
316 @Override
317 public boolean cancel(boolean mayInterruptIfRunning) {
318 if (RESULT_UPDATER.compareAndSet(this, null, CANCELLATION_CAUSE_HOLDER)) {
319 checkNotifyWaiters();
320 notifyListeners();
321 return true;
322 }
323 return false;
324 }
325
326 @Override
327 public boolean isCancelled() {
328 return isCancelled0(result);
329 }
330
331 @Override
332 public boolean isDone() {
333 return isDone0(result);
334 }
335
336 @Override
337 public Promise<V> sync() throws InterruptedException {
338 await();
339 rethrowIfFailed();
340 return this;
341 }
342
343 @Override
344 public Promise<V> syncUninterruptibly() {
345 awaitUninterruptibly();
346 rethrowIfFailed();
347 return this;
348 }
349
350 @Override
351 public String toString() {
352 return toStringBuilder().toString();
353 }
354
355 protected StringBuilder toStringBuilder() {
356 StringBuilder buf = new StringBuilder(64)
357 .append(StringUtil.simpleClassName(this))
358 .append('@')
359 .append(Integer.toHexString(hashCode()));
360
361 Object result = this.result;
362 if (result == SUCCESS) {
363 buf.append("(success)");
364 } else if (result == UNCANCELLABLE) {
365 buf.append("(uncancellable)");
366 } else if (result instanceof CauseHolder) {
367 buf.append("(failure: ")
368 .append(((CauseHolder) result).cause)
369 .append(')');
370 } else if (result != null) {
371 buf.append("(success: ")
372 .append(result)
373 .append(')');
374 } else {
375 buf.append("(incomplete)");
376 }
377
378 return buf;
379 }
380
381
382
383
384
385
386
387
388
389 protected EventExecutor executor() {
390 return executor;
391 }
392
393 protected void checkDeadLock() {
394 EventExecutor e = executor();
395 if (e != null && e.inEventLoop()) {
396 throw new BlockingOperationException(toString());
397 }
398 }
399
400
401
402
403
404
405
406
407
408
409 protected static void notifyListener(
410 EventExecutor eventExecutor, final Future<?> future, final GenericFutureListener<?> listener) {
411 checkNotNull(eventExecutor, "eventExecutor");
412 checkNotNull(future, "future");
413 checkNotNull(listener, "listener");
414 notifyListenerWithStackOverFlowProtection(eventExecutor, future, listener);
415 }
416
417 private void notifyListeners() {
418 EventExecutor executor = executor();
419 if (executor.inEventLoop()) {
420 final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
421 final int stackDepth = threadLocals.futureListenerStackDepth();
422 if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
423 threadLocals.setFutureListenerStackDepth(stackDepth + 1);
424 try {
425 notifyListenersNow();
426 } finally {
427 threadLocals.setFutureListenerStackDepth(stackDepth);
428 }
429 return;
430 }
431 }
432
433 safeExecute(executor, new Runnable() {
434 @Override
435 public void run() {
436 notifyListenersNow();
437 }
438 });
439 }
440
441
442
443
444
445
446 private static void notifyListenerWithStackOverFlowProtection(final EventExecutor executor,
447 final Future<?> future,
448 final GenericFutureListener<?> listener) {
449 if (executor.inEventLoop()) {
450 final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
451 final int stackDepth = threadLocals.futureListenerStackDepth();
452 if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
453 threadLocals.setFutureListenerStackDepth(stackDepth + 1);
454 try {
455 notifyListener0(future, listener);
456 } finally {
457 threadLocals.setFutureListenerStackDepth(stackDepth);
458 }
459 return;
460 }
461 }
462
463 safeExecute(executor, new Runnable() {
464 @Override
465 public void run() {
466 notifyListener0(future, listener);
467 }
468 });
469 }
470
471 private void notifyListenersNow() {
472 Object listeners;
473 synchronized (this) {
474
475 if (notifyingListeners || this.listeners == null) {
476 return;
477 }
478 notifyingListeners = true;
479 listeners = this.listeners;
480 this.listeners = null;
481 }
482 for (;;) {
483 if (listeners instanceof DefaultFutureListeners) {
484 notifyListeners0((DefaultFutureListeners) listeners);
485 } else {
486 notifyListener0(this, (GenericFutureListener<? extends Future<V>>) listeners);
487 }
488 synchronized (this) {
489 if (this.listeners == null) {
490
491
492 notifyingListeners = false;
493 return;
494 }
495 listeners = this.listeners;
496 this.listeners = null;
497 }
498 }
499 }
500
501 private void notifyListeners0(DefaultFutureListeners listeners) {
502 GenericFutureListener<?>[] a = listeners.listeners();
503 int size = listeners.size();
504 for (int i = 0; i < size; i ++) {
505 notifyListener0(this, a[i]);
506 }
507 }
508
509 @SuppressWarnings({ "unchecked", "rawtypes" })
510 private static void notifyListener0(Future future, GenericFutureListener l) {
511 try {
512 l.operationComplete(future);
513 } catch (Throwable t) {
514 logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t);
515 }
516 }
517
518 private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) {
519 if (listeners == null) {
520 listeners = listener;
521 } else if (listeners instanceof DefaultFutureListeners) {
522 ((DefaultFutureListeners) listeners).add(listener);
523 } else {
524 listeners = new DefaultFutureListeners((GenericFutureListener<? extends Future<V>>) listeners, listener);
525 }
526 }
527
528 private void removeListener0(GenericFutureListener<? extends Future<? super V>> listener) {
529 if (listeners instanceof DefaultFutureListeners) {
530 ((DefaultFutureListeners) listeners).remove(listener);
531 } else if (listeners == listener) {
532 listeners = null;
533 }
534 }
535
536 private boolean setSuccess0(V result) {
537 return setValue0(result == null ? SUCCESS : result);
538 }
539
540 private boolean setFailure0(Throwable cause) {
541 return setValue0(new CauseHolder(checkNotNull(cause, "cause")));
542 }
543
544 private boolean setValue0(Object objResult) {
545 if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
546 RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
547 checkNotifyWaiters();
548 return true;
549 }
550 return false;
551 }
552
553 private synchronized void checkNotifyWaiters() {
554 if (waiters > 0) {
555 notifyAll();
556 }
557 }
558
559 private void incWaiters() {
560 if (waiters == Short.MAX_VALUE) {
561 throw new IllegalStateException("too many waiters: " + this);
562 }
563 ++waiters;
564 }
565
566 private void decWaiters() {
567 --waiters;
568 }
569
570 private void rethrowIfFailed() {
571 Throwable cause = cause();
572 if (cause == null) {
573 return;
574 }
575
576 PlatformDependent.throwException(cause);
577 }
578
579 private boolean await0(long timeoutNanos, boolean interruptable) throws InterruptedException {
580 if (isDone()) {
581 return true;
582 }
583
584 if (timeoutNanos <= 0) {
585 return isDone();
586 }
587
588 if (interruptable && Thread.interrupted()) {
589 throw new InterruptedException(toString());
590 }
591
592 checkDeadLock();
593
594 long startTime = System.nanoTime();
595 long waitTime = timeoutNanos;
596 boolean interrupted = false;
597 try {
598 for (;;) {
599 synchronized (this) {
600 if (isDone()) {
601 return true;
602 }
603 incWaiters();
604 try {
605 wait(waitTime / 1000000, (int) (waitTime % 1000000));
606 } catch (InterruptedException e) {
607 if (interruptable) {
608 throw e;
609 } else {
610 interrupted = true;
611 }
612 } finally {
613 decWaiters();
614 }
615 }
616 if (isDone()) {
617 return true;
618 } else {
619 waitTime = timeoutNanos - (System.nanoTime() - startTime);
620 if (waitTime <= 0) {
621 return isDone();
622 }
623 }
624 }
625 } finally {
626 if (interrupted) {
627 Thread.currentThread().interrupt();
628 }
629 }
630 }
631
632
633
634
635
636
637
638
639
640
641
642 @SuppressWarnings("unchecked")
643 void notifyProgressiveListeners(final long progress, final long total) {
644 final Object listeners = progressiveListeners();
645 if (listeners == null) {
646 return;
647 }
648
649 final ProgressiveFuture<V> self = (ProgressiveFuture<V>) this;
650
651 EventExecutor executor = executor();
652 if (executor.inEventLoop()) {
653 if (listeners instanceof GenericProgressiveFutureListener[]) {
654 notifyProgressiveListeners0(
655 self, (GenericProgressiveFutureListener<?>[]) listeners, progress, total);
656 } else {
657 notifyProgressiveListener0(
658 self, (GenericProgressiveFutureListener<ProgressiveFuture<V>>) listeners, progress, total);
659 }
660 } else {
661 if (listeners instanceof GenericProgressiveFutureListener[]) {
662 final GenericProgressiveFutureListener<?>[] array =
663 (GenericProgressiveFutureListener<?>[]) listeners;
664 safeExecute(executor, new Runnable() {
665 @Override
666 public void run() {
667 notifyProgressiveListeners0(self, array, progress, total);
668 }
669 });
670 } else {
671 final GenericProgressiveFutureListener<ProgressiveFuture<V>> l =
672 (GenericProgressiveFutureListener<ProgressiveFuture<V>>) listeners;
673 safeExecute(executor, new Runnable() {
674 @Override
675 public void run() {
676 notifyProgressiveListener0(self, l, progress, total);
677 }
678 });
679 }
680 }
681 }
682
683
684
685
686
687 private synchronized Object progressiveListeners() {
688 Object listeners = this.listeners;
689 if (listeners == null) {
690
691 return null;
692 }
693
694 if (listeners instanceof DefaultFutureListeners) {
695
696 DefaultFutureListeners dfl = (DefaultFutureListeners) listeners;
697 int progressiveSize = dfl.progressiveSize();
698 switch (progressiveSize) {
699 case 0:
700 return null;
701 case 1:
702 for (GenericFutureListener<?> l: dfl.listeners()) {
703 if (l instanceof GenericProgressiveFutureListener) {
704 return l;
705 }
706 }
707 return null;
708 }
709
710 GenericFutureListener<?>[] array = dfl.listeners();
711 GenericProgressiveFutureListener<?>[] copy = new GenericProgressiveFutureListener[progressiveSize];
712 for (int i = 0, j = 0; j < progressiveSize; i ++) {
713 GenericFutureListener<?> l = array[i];
714 if (l instanceof GenericProgressiveFutureListener) {
715 copy[j ++] = (GenericProgressiveFutureListener<?>) l;
716 }
717 }
718
719 return copy;
720 } else if (listeners instanceof GenericProgressiveFutureListener) {
721 return listeners;
722 } else {
723
724 return null;
725 }
726 }
727
728 private static void notifyProgressiveListeners0(
729 ProgressiveFuture<?> future, GenericProgressiveFutureListener<?>[] listeners, long progress, long total) {
730 for (GenericProgressiveFutureListener<?> l: listeners) {
731 if (l == null) {
732 break;
733 }
734 notifyProgressiveListener0(future, l, progress, total);
735 }
736 }
737
738 @SuppressWarnings({ "unchecked", "rawtypes" })
739 private static void notifyProgressiveListener0(
740 ProgressiveFuture future, GenericProgressiveFutureListener l, long progress, long total) {
741 try {
742 l.operationProgressed(future, progress, total);
743 } catch (Throwable t) {
744 logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationProgressed()", t);
745 }
746 }
747
748 private static boolean isCancelled0(Object result) {
749 return result instanceof CauseHolder && ((CauseHolder) result).cause instanceof CancellationException;
750 }
751
752 private static boolean isDone0(Object result) {
753 return result != null && result != UNCANCELLABLE;
754 }
755
756 private static final class CauseHolder {
757 final Throwable cause;
758 CauseHolder(Throwable cause) {
759 this.cause = cause;
760 }
761 }
762
763 private static void safeExecute(EventExecutor executor, Runnable task) {
764 try {
765 executor.execute(task);
766 } catch (Throwable t) {
767 rejectedExecutionLogger.error("Failed to submit a listener notification task. Event loop shut down?", t);
768 }
769 }
770 }