1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.channel.group;
17
18 import org.jboss.netty.channel.Channel;
19 import org.jboss.netty.channel.ChannelFuture;
20 import org.jboss.netty.channel.ChannelFutureListener;
21 import org.jboss.netty.logging.InternalLogger;
22 import org.jboss.netty.logging.InternalLoggerFactory;
23 import org.jboss.netty.util.internal.DeadLockProofWorker;
24
25 import java.util.ArrayList;
26 import java.util.Collection;
27 import java.util.Collections;
28 import java.util.Iterator;
29 import java.util.LinkedHashMap;
30 import java.util.List;
31 import java.util.Map;
32 import java.util.concurrent.TimeUnit;
33
34 import static java.util.concurrent.TimeUnit.*;
35
36
37
38
39 public class DefaultChannelGroupFuture implements ChannelGroupFuture {
40
41 private static final InternalLogger logger =
42 InternalLoggerFactory.getInstance(DefaultChannelGroupFuture.class);
43
44 private final ChannelGroup group;
45 final Map<Integer, ChannelFuture> futures;
46 private ChannelGroupFutureListener firstListener;
47 private List<ChannelGroupFutureListener> otherListeners;
48 private boolean done;
49 int successCount;
50 int failureCount;
51 private int waiters;
52
53 private final ChannelFutureListener childListener = new ChannelFutureListener() {
54 public void operationComplete(ChannelFuture future) throws Exception {
55 boolean success = future.isSuccess();
56 boolean callSetDone;
57 synchronized (DefaultChannelGroupFuture.this) {
58 if (success) {
59 successCount ++;
60 } else {
61 failureCount ++;
62 }
63
64 callSetDone = successCount + failureCount == futures.size();
65 assert successCount + failureCount <= futures.size();
66 }
67
68 if (callSetDone) {
69 setDone();
70 }
71 }
72 };
73
74
75
76
77 public DefaultChannelGroupFuture(ChannelGroup group, Collection<ChannelFuture> futures) {
78 if (group == null) {
79 throw new NullPointerException("group");
80 }
81 if (futures == null) {
82 throw new NullPointerException("futures");
83 }
84
85 this.group = group;
86
87 Map<Integer, ChannelFuture> futureMap = new LinkedHashMap<Integer, ChannelFuture>();
88 for (ChannelFuture f: futures) {
89 futureMap.put(f.getChannel().getId(), f);
90 }
91
92 this.futures = Collections.unmodifiableMap(futureMap);
93
94 for (ChannelFuture f: this.futures.values()) {
95 f.addListener(childListener);
96 }
97
98
99 if (this.futures.isEmpty()) {
100 setDone();
101 }
102 }
103
104 DefaultChannelGroupFuture(ChannelGroup group, Map<Integer, ChannelFuture> futures) {
105 this.group = group;
106 this.futures = Collections.unmodifiableMap(futures);
107 for (ChannelFuture f: this.futures.values()) {
108 f.addListener(childListener);
109 }
110
111
112 if (this.futures.isEmpty()) {
113 setDone();
114 }
115 }
116
117 public ChannelGroup getGroup() {
118 return group;
119 }
120
121 public ChannelFuture find(Integer channelId) {
122 return futures.get(channelId);
123 }
124
125 public ChannelFuture find(Channel channel) {
126 return futures.get(channel.getId());
127 }
128
129 public Iterator<ChannelFuture> iterator() {
130 return futures.values().iterator();
131 }
132
133 public synchronized boolean isDone() {
134 return done;
135 }
136
137 public synchronized boolean isCompleteSuccess() {
138 return successCount == futures.size();
139 }
140
141 public synchronized boolean isPartialSuccess() {
142 return successCount != 0 && successCount != futures.size();
143 }
144
145 public synchronized boolean isPartialFailure() {
146 return failureCount != 0 && failureCount != futures.size();
147 }
148
149 public synchronized boolean isCompleteFailure() {
150 int futureCnt = futures.size();
151 return futureCnt != 0 && failureCount == futureCnt;
152 }
153
154 public void addListener(ChannelGroupFutureListener listener) {
155 if (listener == null) {
156 throw new NullPointerException("listener");
157 }
158
159 boolean notifyNow = false;
160 synchronized (this) {
161 if (done) {
162 notifyNow = true;
163 } else {
164 if (firstListener == null) {
165 firstListener = listener;
166 } else {
167 if (otherListeners == null) {
168 otherListeners = new ArrayList<ChannelGroupFutureListener>(1);
169 }
170 otherListeners.add(listener);
171 }
172 }
173 }
174
175 if (notifyNow) {
176 notifyListener(listener);
177 }
178 }
179
180 public void removeListener(ChannelGroupFutureListener listener) {
181 if (listener == null) {
182 throw new NullPointerException("listener");
183 }
184
185 synchronized (this) {
186 if (!done) {
187 if (listener == firstListener) {
188 if (otherListeners != null && !otherListeners.isEmpty()) {
189 firstListener = otherListeners.remove(0);
190 } else {
191 firstListener = null;
192 }
193 } else if (otherListeners != null) {
194 otherListeners.remove(listener);
195 }
196 }
197 }
198 }
199
200 public ChannelGroupFuture await() throws InterruptedException {
201 if (Thread.interrupted()) {
202 throw new InterruptedException();
203 }
204
205 synchronized (this) {
206 while (!done) {
207 checkDeadLock();
208 waiters++;
209 try {
210 wait();
211 } finally {
212 waiters--;
213 }
214 }
215 }
216 return this;
217 }
218
219 public boolean await(long timeout, TimeUnit unit)
220 throws InterruptedException {
221 return await0(unit.toNanos(timeout), true);
222 }
223
224 public boolean await(long timeoutMillis) throws InterruptedException {
225 return await0(MILLISECONDS.toNanos(timeoutMillis), true);
226 }
227
228 public ChannelGroupFuture awaitUninterruptibly() {
229 boolean interrupted = false;
230 synchronized (this) {
231 while (!done) {
232 checkDeadLock();
233 waiters++;
234 try {
235 wait();
236 } catch (InterruptedException e) {
237 interrupted = true;
238 } finally {
239 waiters--;
240 }
241 }
242 }
243
244 if (interrupted) {
245 Thread.currentThread().interrupt();
246 }
247
248 return this;
249 }
250
251 public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {
252 try {
253 return await0(unit.toNanos(timeout), false);
254 } catch (InterruptedException e) {
255 throw new InternalError();
256 }
257 }
258
259 public boolean awaitUninterruptibly(long timeoutMillis) {
260 try {
261 return await0(MILLISECONDS.toNanos(timeoutMillis), false);
262 } catch (InterruptedException e) {
263 throw new InternalError();
264 }
265 }
266
267 private boolean await0(long timeoutNanos, boolean interruptable) throws InterruptedException {
268 if (interruptable && Thread.interrupted()) {
269 throw new InterruptedException();
270 }
271
272 long startTime = timeoutNanos <= 0 ? 0 : System.nanoTime();
273 long waitTime = timeoutNanos;
274 boolean interrupted = false;
275
276 try {
277 synchronized (this) {
278 if (done || waitTime <= 0) {
279 return done;
280 }
281
282 checkDeadLock();
283 waiters++;
284 try {
285 for (;;) {
286 try {
287 wait(waitTime / 1000000, (int) (waitTime % 1000000));
288 } catch (InterruptedException e) {
289 if (interruptable) {
290 throw e;
291 } else {
292 interrupted = true;
293 }
294 }
295
296 if (done) {
297 return true;
298 } else {
299 waitTime = timeoutNanos - (System.nanoTime() - startTime);
300 if (waitTime <= 0) {
301 return done;
302 }
303 }
304 }
305 } finally {
306 waiters--;
307 }
308 }
309 } finally {
310 if (interrupted) {
311 Thread.currentThread().interrupt();
312 }
313 }
314 }
315
316 private static void checkDeadLock() {
317 if (DeadLockProofWorker.PARENT.get() != null) {
318 throw new IllegalStateException(
319 "await*() in I/O thread causes a dead lock or " +
320 "sudden performance drop. Use addListener() instead or " +
321 "call await*() from a different thread.");
322 }
323 }
324
325 boolean setDone() {
326 synchronized (this) {
327
328 if (done) {
329 return false;
330 }
331
332 done = true;
333 if (waiters > 0) {
334 notifyAll();
335 }
336 }
337
338 notifyListeners();
339 return true;
340 }
341
342 private void notifyListeners() {
343
344
345
346
347
348 if (firstListener != null) {
349 notifyListener(firstListener);
350 firstListener = null;
351
352 if (otherListeners != null) {
353 for (ChannelGroupFutureListener l: otherListeners) {
354 notifyListener(l);
355 }
356 otherListeners = null;
357 }
358 }
359 }
360
361 private void notifyListener(ChannelGroupFutureListener l) {
362 try {
363 l.operationComplete(this);
364 } catch (Throwable t) {
365 if (logger.isWarnEnabled()) {
366 logger.warn(
367 "An exception was thrown by " +
368 ChannelFutureListener.class.getSimpleName() + '.', t);
369 }
370 }
371 }
372 }