1 /*
2 * Copyright 2016 The Netty Project
3 *
4 * The Netty Project licenses this file to you under the Apache License,
5 * version 2.0 (the "License"); you may not use this file except in compliance
6 * with the License. You may obtain a copy of the License at:
7 *
8 * https://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations
14 * under the License.
15 */
16 package io.netty5.util.concurrent;
17
18 import static java.util.Objects.requireNonNull;
19
20
21 /**
22 * <p>A promise combiner monitors the outcome of a number of discrete futures, then notifies a final, aggregate promise
23 * when all of the combined futures are finished. The aggregate promise will succeed if and only if all of the combined
24 * futures succeed. If any of the combined futures fail, the aggregate promise will fail. The cause failure for the
25 * aggregate promise will be the failure for one of the failed combined futures; if more than one of the combined
26 * futures fails, exactly which cause of failure will be assigned to the aggregate promise is undefined.</p>
27 *
28 * <p>Callers may populate a promise combiner with any number of futures to be combined via the
29 * {@link PromiseCombiner#add(Future)} and {@link PromiseCombiner#addAll(Future[])} methods. When all futures to be
30 * combined have been added, callers must provide an aggregate promise to be notified when all combined promises have
31 * finished via the {@link PromiseCombiner#finish(Promise)} method.</p>
32 *
33 * <p>This implementation is <strong>NOT</strong> thread-safe and all methods must be called
34 * from the {@link EventExecutor} thread.</p>
35 */
36 public final class PromiseCombiner {
37 private int expectedCount;
38 private int doneCount;
39 private Promise<Void> aggregatePromise;
40 private Throwable cause;
41 private final FutureListener<Object> listener = new FutureListener<>() {
42 @Override
43 public void operationComplete(final Future<?> future) {
44 if (executor.inEventLoop()) {
45 operationComplete0(future);
46 } else {
47 executor.execute(() -> operationComplete0(future));
48 }
49 }
50
51 private void operationComplete0(Future<?> future) {
52 assert executor.inEventLoop();
53 ++doneCount;
54 if (future.isFailed() && cause == null) {
55 cause = future.cause();
56 }
57 if (doneCount == expectedCount && aggregatePromise != null) {
58 tryPromise();
59 }
60 }
61 };
62
63 private final EventExecutor executor;
64
65 /**
66 * The {@link EventExecutor} to use for notifications. You must call {@link #add(Future)}, {@link #addAll(Future[])}
67 * and {@link #finish(Promise)} from within the {@link EventExecutor} thread.
68 *
69 * @param executor the {@link EventExecutor} to use for notifications.
70 */
71 public PromiseCombiner(EventExecutor executor) {
72 this.executor = requireNonNull(executor, "executor");
73 }
74
75 /**
76 * Adds a new future to be combined. New futures may be added until an aggregate promise is added via the
77 * {@link PromiseCombiner#finish(Promise)} method.
78 *
79 * @param future the future to add to this promise combiner
80 */
81 public void add(Future<?> future) {
82 checkAddAllowed();
83 checkInEventLoop();
84 ++expectedCount;
85 future.addListener(listener);
86 }
87
88 /**
89 * Adds new futures to be combined. New futures may be added until an aggregate promise is added via the
90 * {@link PromiseCombiner#finish(Promise)} method.
91 *
92 * @param futures the futures to add to this promise combiner
93 */
94 public void addAll(Future<?>... futures) {
95 for (Future<?> future : futures) {
96 add(future);
97 }
98 }
99
100 /**
101 * <p>Sets the promise to be notified when all combined futures have finished. If all combined futures succeed,
102 * then the aggregate promise will succeed. If one or more combined futures fails, then the aggregate promise will
103 * fail with the cause of one of the failed futures. If more than one combined future fails, then exactly which
104 * failure will be assigned to the aggregate promise is undefined.</p>
105 *
106 * <p>After this method is called, no more futures may be added via the {@link PromiseCombiner#add(Future)} or
107 * {@link PromiseCombiner#addAll(Future[])} methods.</p>
108 *
109 * @param aggregatePromise the promise to notify when all combined futures have finished
110 */
111 public void finish(Promise<Void> aggregatePromise) {
112 requireNonNull(aggregatePromise, "aggregatePromise");
113 checkInEventLoop();
114 if (this.aggregatePromise != null) {
115 throw new IllegalStateException("Already finished");
116 }
117 this.aggregatePromise = aggregatePromise;
118 if (doneCount == expectedCount) {
119 tryPromise();
120 }
121 }
122
123 private void checkInEventLoop() {
124 if (!executor.inEventLoop()) {
125 throw new IllegalStateException("Must be called from EventExecutor thread");
126 }
127 }
128
129 private boolean tryPromise() {
130 return cause == null? aggregatePromise.trySuccess(null) : aggregatePromise.tryFailure(cause);
131 }
132
133 private void checkAddAllowed() {
134 if (aggregatePromise != null) {
135 throw new IllegalStateException("Adding promises is not allowed after finished adding");
136 }
137 }
138 }