1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.util.concurrent;
17
18 import io.netty.util.internal.ObjectUtil;
19
20 public final class PromiseCombiner {
21 private int expectedCount;
22 private int doneCount;
23 private boolean doneAdding;
24 private Promise<Void> aggregatePromise;
25 private Throwable cause;
26 private final GenericFutureListener<Future<?>> listener = new GenericFutureListener<Future<?>>() {
27 @Override
28 public void operationComplete(Future<?> future) throws Exception {
29 ++doneCount;
30 if (!future.isSuccess() && cause == null) {
31 cause = future.cause();
32 }
33 if (doneCount == expectedCount && doneAdding) {
34 tryPromise();
35 }
36 }
37 };
38
39 @SuppressWarnings({ "unchecked", "rawtypes" })
40 public void add(Promise promise) {
41 checkAddAllowed();
42 ++expectedCount;
43 promise.addListener(listener);
44 }
45
46 @SuppressWarnings({ "unchecked", "rawtypes" })
47 public void addAll(Promise... promises) {
48 checkAddAllowed();
49 expectedCount += promises.length;
50 for (Promise promise : promises) {
51 promise.addListener(listener);
52 }
53 }
54
55 public void finish(Promise<Void> aggregatePromise) {
56 if (doneAdding) {
57 throw new IllegalStateException("Already finished");
58 }
59 doneAdding = true;
60 this.aggregatePromise = ObjectUtil.checkNotNull(aggregatePromise, "aggregatePromise");
61 if (doneCount == expectedCount) {
62 tryPromise();
63 }
64 }
65
66 private boolean tryPromise() {
67 return (cause == null) ? aggregatePromise.trySuccess(null) : aggregatePromise.tryFailure(cause);
68 }
69
70 private void checkAddAllowed() {
71 if (doneAdding) {
72 throw new IllegalStateException("Adding promises is not allowed after finished adding");
73 }
74 }
75 }