View Javadoc
1   /*
2    * Copyright 2016 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.util.concurrent;
17  
18  import io.netty.util.internal.ObjectUtil;
19  
20  /**
21   * <p>A promise combiner monitors the outcome of a number of discrete futures, then notifies a final, aggregate promise
22   * when all of the combined futures are finished. The aggregate promise will succeed if and only if all of the combined
23   * futures succeed. If any of the combined futures fail, the aggregate promise will fail. The cause failure for the
24   * aggregate promise will be the failure for one of the failed combined futures; if more than one of the combined
25   * futures fails, exactly which cause of failure will be assigned to the aggregate promise is undefined.</p>
26   *
27   * <p>Callers may populate a promise combiner with any number of futures to be combined via the
28   * {@link PromiseCombiner#add(Future)} and {@link PromiseCombiner#addAll(Future[])} methods. When all futures to be
29   * combined have been added, callers must provide an aggregate promise to be notified when all combined promises have
30   * finished via the {@link PromiseCombiner#finish(Promise)} method.</p>
31   */
32  public final class PromiseCombiner {
33      private int expectedCount;
34      private int doneCount;
35      private boolean doneAdding;
36      private Promise<Void> aggregatePromise;
37      private Throwable cause;
38      private final GenericFutureListener<Future<?>> listener = new GenericFutureListener<Future<?>>() {
39          @Override
40          public void operationComplete(Future<?> future) throws Exception {
41              ++doneCount;
42              if (!future.isSuccess() && cause == null) {
43                  cause = future.cause();
44              }
45              if (doneCount == expectedCount && doneAdding) {
46                  tryPromise();
47              }
48          }
49      };
50  
51      /**
52       * Adds a new promise to be combined. New promises may be added until an aggregate promise is added via the
53       * {@link PromiseCombiner#finish(Promise)} method.
54       *
55       * @param promise the promise to add to this promise combiner
56       *
57       * @deprecated Replaced by {@link PromiseCombiner#add(Future)}.
58       */
59      @Deprecated
60      public void add(Promise promise) {
61          add((Future) promise);
62      }
63  
64      /**
65       * Adds a new future to be combined. New futures may be added until an aggregate promise is added via the
66       * {@link PromiseCombiner#finish(Promise)} method.
67       *
68       * @param future the future to add to this promise combiner
69       */
70      @SuppressWarnings({ "unchecked", "rawtypes" })
71      public void add(Future future) {
72          checkAddAllowed();
73          ++expectedCount;
74          future.addListener(listener);
75      }
76  
77      /**
78       * Adds new promises to be combined. New promises may be added until an aggregate promise is added via the
79       * {@link PromiseCombiner#finish(Promise)} method.
80       *
81       * @param promises the promises to add to this promise combiner
82       *
83       * @deprecated Replaced by {@link PromiseCombiner#addAll(Future[])}
84       */
85      @Deprecated
86      public void addAll(Promise... promises) {
87          addAll((Future[]) promises);
88      }
89  
90      /**
91       * Adds new futures to be combined. New futures may be added until an aggregate promise is added via the
92       * {@link PromiseCombiner#finish(Promise)} method.
93       *
94       * @param futures the futures to add to this promise combiner
95       */
96      @SuppressWarnings({ "unchecked", "rawtypes" })
97      public void addAll(Future... futures) {
98          for (Future future : futures) {
99              this.add(future);
100         }
101     }
102 
103     /**
104      * <p>Sets the promise to be notified when all combined futures have finished. If all combined futures succeed,
105      * then the aggregate promise will succeed. If one or more combined futures fails, then the aggregate promise will
106      * fail with the cause of one of the failed futures. If more than one combined future fails, then exactly which
107      * failure will be assigned to the aggregate promise is undefined.</p>
108      *
109      * <p>After this method is called, no more futures may be added via the {@link PromiseCombiner#add(Future)} or
110      * {@link PromiseCombiner#addAll(Future[])} methods.</p>
111      *
112      * @param aggregatePromise the promise to notify when all combined futures have finished
113      */
114     public void finish(Promise<Void> aggregatePromise) {
115         if (doneAdding) {
116             throw new IllegalStateException("Already finished");
117         }
118         doneAdding = true;
119         this.aggregatePromise = ObjectUtil.checkNotNull(aggregatePromise, "aggregatePromise");
120         if (doneCount == expectedCount) {
121             tryPromise();
122         }
123     }
124 
125     private boolean tryPromise() {
126         return (cause == null) ? aggregatePromise.trySuccess(null) : aggregatePromise.tryFailure(cause);
127     }
128 
129     private void checkAddAllowed() {
130         if (doneAdding) {
131             throw new IllegalStateException("Adding promises is not allowed after finished adding");
132         }
133     }
134 }