View Javadoc
1   /*
2    * Copyright 2016 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty5.util.concurrent;
17  
18  import static java.util.Objects.requireNonNull;
19  
20  
21  /**
22   * <p>A promise combiner monitors the outcome of a number of discrete futures, then notifies a final, aggregate promise
23   * when all of the combined futures are finished. The aggregate promise will succeed if and only if all of the combined
24   * futures succeed. If any of the combined futures fail, the aggregate promise will fail. The cause failure for the
25   * aggregate promise will be the failure for one of the failed combined futures; if more than one of the combined
26   * futures fails, exactly which cause of failure will be assigned to the aggregate promise is undefined.</p>
27   *
28   * <p>Callers may populate a promise combiner with any number of futures to be combined via the
29   * {@link PromiseCombiner#add(Future)} and {@link PromiseCombiner#addAll(Future[])} methods. When all futures to be
30   * combined have been added, callers must provide an aggregate promise to be notified when all combined promises have
31   * finished via the {@link PromiseCombiner#finish(Promise)} method.</p>
32   *
33   * <p>This implementation is <strong>NOT</strong> thread-safe and all methods must be called
34   * from the {@link EventExecutor} thread.</p>
35   */
36  public final class PromiseCombiner {
37      private int expectedCount;
38      private int doneCount;
39      private Promise<Void> aggregatePromise;
40      private Throwable cause;
41      private final FutureListener<Object> listener = new FutureListener<>() {
42          @Override
43          public void operationComplete(final Future<?> future) {
44              if (executor.inEventLoop()) {
45                  operationComplete0(future);
46              } else {
47                  executor.execute(() -> operationComplete0(future));
48              }
49          }
50  
51          private void operationComplete0(Future<?> future) {
52              assert executor.inEventLoop();
53              ++doneCount;
54              if (future.isFailed() && cause == null) {
55                  cause = future.cause();
56              }
57              if (doneCount == expectedCount && aggregatePromise != null) {
58                  tryPromise();
59              }
60          }
61      };
62  
63      private final EventExecutor executor;
64  
65      /**
66       * The {@link EventExecutor} to use for notifications. You must call {@link #add(Future)}, {@link #addAll(Future[])}
67       * and {@link #finish(Promise)} from within the {@link EventExecutor} thread.
68       *
69       * @param executor the {@link EventExecutor} to use for notifications.
70       */
71      public PromiseCombiner(EventExecutor executor) {
72          this.executor = requireNonNull(executor, "executor");
73      }
74  
75      /**
76       * Adds a new future to be combined. New futures may be added until an aggregate promise is added via the
77       * {@link PromiseCombiner#finish(Promise)} method.
78       *
79       * @param future the future to add to this promise combiner
80       */
81      public void add(Future<?> future) {
82          checkAddAllowed();
83          checkInEventLoop();
84          ++expectedCount;
85          future.addListener(listener);
86      }
87  
88      /**
89       * Adds new futures to be combined. New futures may be added until an aggregate promise is added via the
90       * {@link PromiseCombiner#finish(Promise)} method.
91       *
92       * @param futures the futures to add to this promise combiner
93       */
94      public void addAll(Future<?>... futures) {
95          for (Future<?> future : futures) {
96              add(future);
97          }
98      }
99  
100     /**
101      * <p>Sets the promise to be notified when all combined futures have finished. If all combined futures succeed,
102      * then the aggregate promise will succeed. If one or more combined futures fails, then the aggregate promise will
103      * fail with the cause of one of the failed futures. If more than one combined future fails, then exactly which
104      * failure will be assigned to the aggregate promise is undefined.</p>
105      *
106      * <p>After this method is called, no more futures may be added via the {@link PromiseCombiner#add(Future)} or
107      * {@link PromiseCombiner#addAll(Future[])} methods.</p>
108      *
109      * @param aggregatePromise the promise to notify when all combined futures have finished
110      */
111     public void finish(Promise<Void> aggregatePromise) {
112         requireNonNull(aggregatePromise, "aggregatePromise");
113         checkInEventLoop();
114         if (this.aggregatePromise != null) {
115             throw new IllegalStateException("Already finished");
116         }
117         this.aggregatePromise = aggregatePromise;
118         if (doneCount == expectedCount) {
119             tryPromise();
120         }
121     }
122 
123     private void checkInEventLoop() {
124         if (!executor.inEventLoop()) {
125             throw new IllegalStateException("Must be called from EventExecutor thread");
126         }
127     }
128 
129     private boolean tryPromise() {
130         return cause == null? aggregatePromise.trySuccess(null) : aggregatePromise.tryFailure(cause);
131     }
132 
133     private void checkAddAllowed() {
134         if (aggregatePromise != null) {
135             throw new IllegalStateException("Adding promises is not allowed after finished adding");
136         }
137     }
138 }