1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.channel.group;
17
18 import static java.util.concurrent.TimeUnit.*;
19
20 import java.util.ArrayList;
21 import java.util.Collection;
22 import java.util.Collections;
23 import java.util.Iterator;
24 import java.util.LinkedHashMap;
25 import java.util.List;
26 import java.util.Map;
27 import java.util.concurrent.TimeUnit;
28
29 import org.jboss.netty.channel.Channel;
30 import org.jboss.netty.channel.ChannelFuture;
31 import org.jboss.netty.channel.ChannelFutureListener;
32 import org.jboss.netty.logging.InternalLogger;
33 import org.jboss.netty.logging.InternalLoggerFactory;
34 import org.jboss.netty.util.internal.DeadLockProofWorker;
35
36
37
38
39 public class DefaultChannelGroupFuture implements ChannelGroupFuture {
40
41 private static final InternalLogger logger =
42 InternalLoggerFactory.getInstance(DefaultChannelGroupFuture.class);
43
44 private final ChannelGroup group;
45 final Map<Integer, ChannelFuture> futures;
46 private ChannelGroupFutureListener firstListener;
47 private List<ChannelGroupFutureListener> otherListeners;
48 private boolean done;
49 int successCount;
50 int failureCount;
51 private int waiters;
52
53 private final ChannelFutureListener childListener = new ChannelFutureListener() {
54 public void operationComplete(ChannelFuture future) throws Exception {
55 boolean success = future.isSuccess();
56 boolean callSetDone = false;
57 synchronized (DefaultChannelGroupFuture.this) {
58 if (success) {
59 successCount ++;
60 } else {
61 failureCount ++;
62 }
63
64 callSetDone = successCount + failureCount == futures.size();
65 assert successCount + failureCount <= futures.size();
66 }
67
68 if (callSetDone) {
69 setDone();
70 }
71 }
72 };
73
74
75
76
77 public DefaultChannelGroupFuture(ChannelGroup group, Collection<ChannelFuture> futures) {
78 if (group == null) {
79 throw new NullPointerException("group");
80 }
81 if (futures == null) {
82 throw new NullPointerException("futures");
83 }
84
85 this.group = group;
86
87 Map<Integer, ChannelFuture> futureMap = new LinkedHashMap<Integer, ChannelFuture>();
88 for (ChannelFuture f: futures) {
89 futureMap.put(f.getChannel().getId(), f);
90 }
91
92 this.futures = Collections.unmodifiableMap(futureMap);
93
94 for (ChannelFuture f: this.futures.values()) {
95 f.addListener(childListener);
96 }
97
98
99 if (this.futures.isEmpty()) {
100 setDone();
101 }
102 }
103
104 DefaultChannelGroupFuture(ChannelGroup group, Map<Integer, ChannelFuture> futures) {
105 this.group = group;
106 this.futures = Collections.unmodifiableMap(futures);
107 for (ChannelFuture f: this.futures.values()) {
108 f.addListener(childListener);
109 }
110
111
112 if (this.futures.isEmpty()) {
113 setDone();
114 }
115 }
116
117 public ChannelGroup getGroup() {
118 return group;
119 }
120
121 public ChannelFuture find(Integer channelId) {
122 return futures.get(channelId);
123 }
124
125 public ChannelFuture find(Channel channel) {
126 return futures.get(channel.getId());
127 }
128
129 public Iterator<ChannelFuture> iterator() {
130 return futures.values().iterator();
131 }
132
133 public synchronized boolean isDone() {
134 return done;
135 }
136
137 public synchronized boolean isCompleteSuccess() {
138 return successCount == futures.size();
139 }
140
141 public synchronized boolean isPartialSuccess() {
142 return successCount != 0 && successCount != futures.size();
143 }
144
145 public synchronized boolean isPartialFailure() {
146 return failureCount != 0 && failureCount != futures.size();
147 }
148
149 public synchronized boolean isCompleteFailure() {
150 int futureCnt = futures.size();
151 return futureCnt != 0 && failureCount == futureCnt;
152 }
153
154 public void addListener(ChannelGroupFutureListener listener) {
155 if (listener == null) {
156 throw new NullPointerException("listener");
157 }
158
159 boolean notifyNow = false;
160 synchronized (this) {
161 if (done) {
162 notifyNow = true;
163 } else {
164 if (firstListener == null) {
165 firstListener = listener;
166 } else {
167 if (otherListeners == null) {
168 otherListeners = new ArrayList<ChannelGroupFutureListener>(1);
169 }
170 otherListeners.add(listener);
171 }
172 }
173 }
174
175 if (notifyNow) {
176 notifyListener(listener);
177 }
178 }
179
180 public void removeListener(ChannelGroupFutureListener listener) {
181 if (listener == null) {
182 throw new NullPointerException("listener");
183 }
184
185 synchronized (this) {
186 if (!done) {
187 if (listener == firstListener) {
188 if (otherListeners != null && !otherListeners.isEmpty()) {
189 firstListener = otherListeners.remove(0);
190 } else {
191 firstListener = null;
192 }
193 } else if (otherListeners != null) {
194 otherListeners.remove(listener);
195 }
196 }
197 }
198 }
199
200 public ChannelGroupFuture await() throws InterruptedException {
201 if (Thread.interrupted()) {
202 throw new InterruptedException();
203 }
204
205 synchronized (this) {
206 while (!done) {
207 checkDeadLock();
208 waiters++;
209 try {
210 wait();
211 } finally {
212 waiters--;
213 }
214 }
215 }
216 return this;
217 }
218
219 public boolean await(long timeout, TimeUnit unit)
220 throws InterruptedException {
221 return await0(unit.toNanos(timeout), true);
222 }
223
224 public boolean await(long timeoutMillis) throws InterruptedException {
225 return await0(MILLISECONDS.toNanos(timeoutMillis), true);
226 }
227
228 public ChannelGroupFuture awaitUninterruptibly() {
229 boolean interrupted = false;
230 synchronized (this) {
231 while (!done) {
232 checkDeadLock();
233 waiters++;
234 try {
235 wait();
236 } catch (InterruptedException e) {
237 interrupted = true;
238 } finally {
239 waiters--;
240 }
241 }
242 }
243
244 if (interrupted) {
245 Thread.currentThread().interrupt();
246 }
247
248 return this;
249 }
250
251 public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {
252 try {
253 return await0(unit.toNanos(timeout), false);
254 } catch (InterruptedException e) {
255 throw new InternalError();
256 }
257 }
258
259 public boolean awaitUninterruptibly(long timeoutMillis) {
260 try {
261 return await0(MILLISECONDS.toNanos(timeoutMillis), false);
262 } catch (InterruptedException e) {
263 throw new InternalError();
264 }
265 }
266
267 private boolean await0(long timeoutNanos, boolean interruptable) throws InterruptedException {
268 if (interruptable && Thread.interrupted()) {
269 throw new InterruptedException();
270 }
271
272 long startTime = timeoutNanos <= 0 ? 0 : System.nanoTime();
273 long waitTime = timeoutNanos;
274 boolean interrupted = false;
275
276 try {
277 synchronized (this) {
278 if (done) {
279 return done;
280 } else if (waitTime <= 0) {
281 return done;
282 }
283
284 checkDeadLock();
285 waiters++;
286 try {
287 for (;;) {
288 try {
289 wait(waitTime / 1000000, (int) (waitTime % 1000000));
290 } catch (InterruptedException e) {
291 if (interruptable) {
292 throw e;
293 } else {
294 interrupted = true;
295 }
296 }
297
298 if (done) {
299 return true;
300 } else {
301 waitTime = timeoutNanos - (System.nanoTime() - startTime);
302 if (waitTime <= 0) {
303 return done;
304 }
305 }
306 }
307 } finally {
308 waiters--;
309 }
310 }
311 } finally {
312 if (interrupted) {
313 Thread.currentThread().interrupt();
314 }
315 }
316 }
317
318 private static void checkDeadLock() {
319 if (DeadLockProofWorker.PARENT.get() != null) {
320 throw new IllegalStateException(
321 "await*() in I/O thread causes a dead lock or " +
322 "sudden performance drop. Use addListener() instead or " +
323 "call await*() from a different thread.");
324 }
325 }
326
327 boolean setDone() {
328 synchronized (this) {
329
330 if (done) {
331 return false;
332 }
333
334 done = true;
335 if (waiters > 0) {
336 notifyAll();
337 }
338 }
339
340 notifyListeners();
341 return true;
342 }
343
344 private void notifyListeners() {
345
346
347
348
349
350 if (firstListener != null) {
351 notifyListener(firstListener);
352 firstListener = null;
353
354 if (otherListeners != null) {
355 for (ChannelGroupFutureListener l: otherListeners) {
356 notifyListener(l);
357 }
358 otherListeners = null;
359 }
360 }
361 }
362
363 private void notifyListener(ChannelGroupFutureListener l) {
364 try {
365 l.operationComplete(this);
366 } catch (Throwable t) {
367 if (logger.isWarnEnabled()) {
368 logger.warn(
369 "An exception was thrown by " +
370 ChannelFutureListener.class.getSimpleName() + '.', t);
371 }
372 }
373 }
374 }