1 /* 2 * Copyright 2016 The Netty Project 3 * 4 * The Netty Project licenses this file to you under the Apache License, 5 * version 2.0 (the "License"); you may not use this file except in compliance 6 * with the License. You may obtain a copy of the License at: 7 * 8 * https://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 13 * License for the specific language governing permissions and limitations 14 * under the License. 15 */ 16 package io.netty5.util.concurrent; 17 18 import static java.util.Objects.requireNonNull; 19 20 21 /** 22 * <p>A promise combiner monitors the outcome of a number of discrete futures, then notifies a final, aggregate promise 23 * when all of the combined futures are finished. The aggregate promise will succeed if and only if all of the combined 24 * futures succeed. If any of the combined futures fail, the aggregate promise will fail. The cause failure for the 25 * aggregate promise will be the failure for one of the failed combined futures; if more than one of the combined 26 * futures fails, exactly which cause of failure will be assigned to the aggregate promise is undefined.</p> 27 * 28 * <p>Callers may populate a promise combiner with any number of futures to be combined via the 29 * {@link PromiseCombiner#add(Future)} and {@link PromiseCombiner#addAll(Future[])} methods. When all futures to be 30 * combined have been added, callers must provide an aggregate promise to be notified when all combined promises have 31 * finished via the {@link PromiseCombiner#finish(Promise)} method.</p> 32 * 33 * <p>This implementation is <strong>NOT</strong> thread-safe and all methods must be called 34 * from the {@link EventExecutor} thread.</p> 35 */ 36 public final class PromiseCombiner { 37 private int expectedCount; 38 private int doneCount; 39 private Promise<Void> aggregatePromise; 40 private Throwable cause; 41 private final FutureListener<Object> listener = new FutureListener<>() { 42 @Override 43 public void operationComplete(final Future<?> future) { 44 if (executor.inEventLoop()) { 45 operationComplete0(future); 46 } else { 47 executor.execute(() -> operationComplete0(future)); 48 } 49 } 50 51 private void operationComplete0(Future<?> future) { 52 assert executor.inEventLoop(); 53 ++doneCount; 54 if (future.isFailed() && cause == null) { 55 cause = future.cause(); 56 } 57 if (doneCount == expectedCount && aggregatePromise != null) { 58 tryPromise(); 59 } 60 } 61 }; 62 63 private final EventExecutor executor; 64 65 /** 66 * The {@link EventExecutor} to use for notifications. You must call {@link #add(Future)}, {@link #addAll(Future[])} 67 * and {@link #finish(Promise)} from within the {@link EventExecutor} thread. 68 * 69 * @param executor the {@link EventExecutor} to use for notifications. 70 */ 71 public PromiseCombiner(EventExecutor executor) { 72 this.executor = requireNonNull(executor, "executor"); 73 } 74 75 /** 76 * Adds a new future to be combined. New futures may be added until an aggregate promise is added via the 77 * {@link PromiseCombiner#finish(Promise)} method. 78 * 79 * @param future the future to add to this promise combiner 80 */ 81 public void add(Future<?> future) { 82 checkAddAllowed(); 83 checkInEventLoop(); 84 ++expectedCount; 85 future.addListener(listener); 86 } 87 88 /** 89 * Adds new futures to be combined. New futures may be added until an aggregate promise is added via the 90 * {@link PromiseCombiner#finish(Promise)} method. 91 * 92 * @param futures the futures to add to this promise combiner 93 */ 94 public void addAll(Future<?>... futures) { 95 for (Future<?> future : futures) { 96 add(future); 97 } 98 } 99 100 /** 101 * <p>Sets the promise to be notified when all combined futures have finished. If all combined futures succeed, 102 * then the aggregate promise will succeed. If one or more combined futures fails, then the aggregate promise will 103 * fail with the cause of one of the failed futures. If more than one combined future fails, then exactly which 104 * failure will be assigned to the aggregate promise is undefined.</p> 105 * 106 * <p>After this method is called, no more futures may be added via the {@link PromiseCombiner#add(Future)} or 107 * {@link PromiseCombiner#addAll(Future[])} methods.</p> 108 * 109 * @param aggregatePromise the promise to notify when all combined futures have finished 110 */ 111 public void finish(Promise<Void> aggregatePromise) { 112 requireNonNull(aggregatePromise, "aggregatePromise"); 113 checkInEventLoop(); 114 if (this.aggregatePromise != null) { 115 throw new IllegalStateException("Already finished"); 116 } 117 this.aggregatePromise = aggregatePromise; 118 if (doneCount == expectedCount) { 119 tryPromise(); 120 } 121 } 122 123 private void checkInEventLoop() { 124 if (!executor.inEventLoop()) { 125 throw new IllegalStateException("Must be called from EventExecutor thread"); 126 } 127 } 128 129 private boolean tryPromise() { 130 return cause == null? aggregatePromise.trySuccess(null) : aggregatePromise.tryFailure(cause); 131 } 132 133 private void checkAddAllowed() { 134 if (aggregatePromise != null) { 135 throw new IllegalStateException("Adding promises is not allowed after finished adding"); 136 } 137 } 138 }