1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.group;
17
18 import io.netty.channel.Channel;
19 import io.netty.channel.ChannelFuture;
20 import io.netty.channel.ChannelFutureListener;
21 import io.netty.util.concurrent.BlockingOperationException;
22 import io.netty.util.concurrent.DefaultPromise;
23 import io.netty.util.concurrent.EventExecutor;
24 import io.netty.util.concurrent.Future;
25 import io.netty.util.concurrent.GenericFutureListener;
26 import io.netty.util.concurrent.ImmediateEventExecutor;
27
28 import java.util.ArrayList;
29 import java.util.Collection;
30 import java.util.Collections;
31 import java.util.Iterator;
32 import java.util.LinkedHashMap;
33 import java.util.List;
34 import java.util.Map;
35
36
37
38
39
40 final class DefaultChannelGroupFuture extends DefaultPromise<Void> implements ChannelGroupFuture {
41
42 private final ChannelGroup group;
43 private final Map<Channel, ChannelFuture> futures;
44 private int successCount;
45 private int failureCount;
46
47 private final ChannelFutureListener childListener = new ChannelFutureListener() {
48 @Override
49 public void operationComplete(ChannelFuture future) throws Exception {
50 boolean success = future.isSuccess();
51 boolean callSetDone;
52 synchronized (DefaultChannelGroupFuture.this) {
53 if (success) {
54 successCount ++;
55 } else {
56 failureCount ++;
57 }
58
59 callSetDone = successCount + failureCount == futures.size();
60 assert successCount + failureCount <= futures.size();
61 }
62
63 if (callSetDone) {
64 if (failureCount > 0) {
65 List<Map.Entry<Channel, Throwable>> failed =
66 new ArrayList<Map.Entry<Channel, Throwable>>(failureCount);
67 for (ChannelFuture f: futures.values()) {
68 if (!f.isSuccess()) {
69 failed.add(new DefaultEntry<Channel, Throwable>(f.channel(), f.cause()));
70 }
71 }
72 setFailure0(new ChannelGroupException(failed));
73 } else {
74 setSuccess0();
75 }
76 }
77 }
78 };
79
80
81
82
83 DefaultChannelGroupFuture(ChannelGroup group, Collection<ChannelFuture> futures, EventExecutor executor) {
84 super(executor);
85 if (group == null) {
86 throw new NullPointerException("group");
87 }
88 if (futures == null) {
89 throw new NullPointerException("futures");
90 }
91
92 this.group = group;
93
94 Map<Channel, ChannelFuture> futureMap = new LinkedHashMap<Channel, ChannelFuture>();
95 for (ChannelFuture f: futures) {
96 futureMap.put(f.channel(), f);
97 }
98
99 this.futures = Collections.unmodifiableMap(futureMap);
100
101 for (ChannelFuture f: this.futures.values()) {
102 f.addListener(childListener);
103 }
104
105
106 if (this.futures.isEmpty()) {
107 setSuccess0();
108 }
109 }
110
111 DefaultChannelGroupFuture(ChannelGroup group, Map<Channel, ChannelFuture> futures, EventExecutor executor) {
112 super(executor);
113 this.group = group;
114 this.futures = Collections.unmodifiableMap(futures);
115 for (ChannelFuture f: this.futures.values()) {
116 f.addListener(childListener);
117 }
118
119
120 if (this.futures.isEmpty()) {
121 setSuccess0();
122 }
123 }
124
125 @Override
126 public ChannelGroup group() {
127 return group;
128 }
129
130 @Override
131 public ChannelFuture find(Channel channel) {
132 return futures.get(channel);
133 }
134
135 @Override
136 public Iterator<ChannelFuture> iterator() {
137 return futures.values().iterator();
138 }
139
140 @Override
141 public synchronized boolean isPartialSuccess() {
142 return successCount != 0 && successCount != futures.size();
143 }
144
145 @Override
146 public synchronized boolean isPartialFailure() {
147 return failureCount != 0 && failureCount != futures.size();
148 }
149
150 @Override
151 public DefaultChannelGroupFuture addListener(GenericFutureListener<? extends Future<? super Void>> listener) {
152 super.addListener(listener);
153 return this;
154 }
155
156 @Override
157 public DefaultChannelGroupFuture addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners) {
158 super.addListeners(listeners);
159 return this;
160 }
161
162 @Override
163 public DefaultChannelGroupFuture removeListener(GenericFutureListener<? extends Future<? super Void>> listener) {
164 super.removeListener(listener);
165 return this;
166 }
167
168 @Override
169 public DefaultChannelGroupFuture removeListeners(
170 GenericFutureListener<? extends Future<? super Void>>... listeners) {
171 super.removeListeners(listeners);
172 return this;
173 }
174
175 @Override
176 public DefaultChannelGroupFuture await() throws InterruptedException {
177 super.await();
178 return this;
179 }
180
181 @Override
182 public DefaultChannelGroupFuture awaitUninterruptibly() {
183 super.awaitUninterruptibly();
184 return this;
185 }
186
187 @Override
188 public DefaultChannelGroupFuture syncUninterruptibly() {
189 super.syncUninterruptibly();
190 return this;
191 }
192
193 @Override
194 public DefaultChannelGroupFuture sync() throws InterruptedException {
195 super.sync();
196 return this;
197 }
198
199 @Override
200 public ChannelGroupException cause() {
201 return (ChannelGroupException) super.cause();
202 }
203
204 private void setSuccess0() {
205 super.setSuccess(null);
206 }
207
208 private void setFailure0(ChannelGroupException cause) {
209 super.setFailure(cause);
210 }
211
212 @Override
213 public DefaultChannelGroupFuture setSuccess(Void result) {
214 throw new IllegalStateException();
215 }
216
217 @Override
218 public boolean trySuccess(Void result) {
219 throw new IllegalStateException();
220 }
221
222 @Override
223 public DefaultChannelGroupFuture setFailure(Throwable cause) {
224 throw new IllegalStateException();
225 }
226
227 @Override
228 public boolean tryFailure(Throwable cause) {
229 throw new IllegalStateException();
230 }
231
232 @Override
233 protected void checkDeadLock() {
234 EventExecutor e = executor();
235 if (e != null && e != ImmediateEventExecutor.INSTANCE && e.inEventLoop()) {
236 throw new BlockingOperationException();
237 }
238 }
239
240 private static final class DefaultEntry<K, V> implements Map.Entry<K, V> {
241 private final K key;
242 private final V value;
243
244 DefaultEntry(K key, V value) {
245 this.key = key;
246 this.value = value;
247 }
248
249 @Override
250 public K getKey() {
251 return key;
252 }
253
254 @Override
255 public V getValue() {
256 return value;
257 }
258
259 @Override
260 public V setValue(V value) {
261 throw new UnsupportedOperationException("read-only");
262 }
263 }
264 }