1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty5.channel.group;
17
18 import io.netty5.channel.Channel;
19 import io.netty5.util.concurrent.BlockingOperationException;
20 import io.netty5.util.concurrent.DefaultPromise;
21 import io.netty5.util.concurrent.EventExecutor;
22 import io.netty5.util.concurrent.Future;
23 import io.netty5.util.concurrent.FutureListener;
24 import io.netty5.util.concurrent.ImmediateEventExecutor;
25
26 import java.util.ArrayList;
27 import java.util.Collections;
28 import java.util.Iterator;
29 import java.util.List;
30 import java.util.Map;
31 import java.util.Map.Entry;
32
33
34
35
36 final class DefaultChannelGroupFuture extends DefaultPromise<Void> implements ChannelGroupFuture {
37
38 private final ChannelGroup group;
39 private final Map<Channel, Future<Void>> futures;
40 private int successCount;
41 private int failureCount;
42
43 private final FutureListener<Void> childListener = new FutureListener<>() {
44 @Override
45 public void operationComplete(Future<? extends Void> future) throws Exception {
46 boolean success = future.isSuccess();
47 boolean callSetDone;
48 synchronized (DefaultChannelGroupFuture.this) {
49 if (success) {
50 successCount ++;
51 } else {
52 failureCount ++;
53 }
54
55 callSetDone = successCount + failureCount == futures.size();
56 assert successCount + failureCount <= futures.size();
57 }
58
59 if (callSetDone) {
60 if (failureCount > 0) {
61 List<Map.Entry<Channel, Throwable>> failed =
62 new ArrayList<>(failureCount);
63 for (Entry<Channel, Future<Void>> entry: futures.entrySet()) {
64 if (entry.getValue().isFailed()) {
65 failed.add(new DefaultEntry<>(entry.getKey(), entry.getValue().cause()));
66 }
67 }
68 setFailure0(new ChannelGroupException(failed));
69 } else {
70 setSuccess0();
71 }
72 }
73 }
74 };
75
76 DefaultChannelGroupFuture(ChannelGroup group, Map<Channel, Future<Void>> futures, EventExecutor executor) {
77 super(executor);
78 this.group = group;
79 this.futures = Collections.unmodifiableMap(futures);
80 for (Future<Void> f: this.futures.values()) {
81 f.addListener(childListener);
82 }
83
84
85 if (this.futures.isEmpty()) {
86 setSuccess0();
87 }
88 }
89
90 @Override
91 public ChannelGroup group() {
92 return group;
93 }
94
95 @Override
96 public Future<Void> find(Channel channel) {
97 return futures.get(channel);
98 }
99
100 @Override
101 public Iterator<Future<Void>> iterator() {
102 return futures.values().iterator();
103 }
104
105 @Override
106 public synchronized boolean isPartialSuccess() {
107 return successCount != 0 && successCount != futures.size();
108 }
109
110 @Override
111 public synchronized boolean isPartialFailure() {
112 return failureCount != 0 && failureCount != futures.size();
113 }
114
115 @Override
116 public DefaultChannelGroupFuture addListener(FutureListener<? super Void> listener) {
117 super.addListener(listener);
118 return this;
119 }
120
121 @Override
122 public DefaultChannelGroupFuture await() throws InterruptedException {
123 super.await();
124 return this;
125 }
126
127 @Override
128 public ChannelGroupException cause() {
129 return (ChannelGroupException) super.cause();
130 }
131
132 private void setSuccess0() {
133 super.setSuccess(null);
134 }
135
136 private void setFailure0(ChannelGroupException cause) {
137 super.setFailure(cause);
138 }
139
140 @Override
141 public DefaultChannelGroupFuture setSuccess(Void result) {
142 throw new IllegalStateException();
143 }
144
145 @Override
146 public boolean trySuccess(Void result) {
147 throw new IllegalStateException();
148 }
149
150 @Override
151 public DefaultChannelGroupFuture setFailure(Throwable cause) {
152 throw new IllegalStateException();
153 }
154
155 @Override
156 public boolean tryFailure(Throwable cause) {
157 throw new IllegalStateException();
158 }
159
160 @Override
161 protected void checkDeadLock() {
162 EventExecutor e = executor();
163 if (e != null && e != ImmediateEventExecutor.INSTANCE && e.inEventLoop()) {
164 throw new BlockingOperationException();
165 }
166 }
167
168 private static final class DefaultEntry<K, V> implements Map.Entry<K, V> {
169 private final K key;
170 private final V value;
171
172 DefaultEntry(K key, V value) {
173 this.key = key;
174 this.value = value;
175 }
176
177 @Override
178 public K getKey() {
179 return key;
180 }
181
182 @Override
183 public V getValue() {
184 return value;
185 }
186
187 @Override
188 public V setValue(V value) {
189 throw new UnsupportedOperationException("read-only");
190 }
191 }
192 }