1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.channel;
17
18 import org.jboss.netty.logging.InternalLogger;
19 import org.jboss.netty.logging.InternalLoggerFactory;
20 import org.jboss.netty.util.internal.DeadLockProofWorker;
21
22 import java.util.ArrayList;
23 import java.util.Collection;
24 import java.util.List;
25 import java.util.concurrent.TimeUnit;
26
27 import static java.util.concurrent.TimeUnit.*;
28
29
30
31
32
33
34
35 public class DefaultChannelFuture implements ChannelFuture {
36
37 private static final InternalLogger logger =
38 InternalLoggerFactory.getInstance(DefaultChannelFuture.class);
39
40 private static final Throwable CANCELLED = new Throwable();
41
42 private static volatile boolean useDeadLockChecker = true;
43 private static boolean disabledDeadLockCheckerOnce;
44
45
46
47
48 public static boolean isUseDeadLockChecker() {
49 return useDeadLockChecker;
50 }
51
52
53
54
55
56 public static void setUseDeadLockChecker(boolean useDeadLockChecker) {
57 if (!useDeadLockChecker && !disabledDeadLockCheckerOnce) {
58 disabledDeadLockCheckerOnce = true;
59 if (logger.isDebugEnabled()) {
60 logger.debug(
61 "The dead lock checker in " +
62 DefaultChannelFuture.class.getSimpleName() +
63 " has been disabled as requested at your own risk.");
64 }
65 }
66 DefaultChannelFuture.useDeadLockChecker = useDeadLockChecker;
67 }
68
69 private final Channel channel;
70 private final boolean cancellable;
71
72 private ChannelFutureListener firstListener;
73 private List<ChannelFutureListener> otherListeners;
74 private List<ChannelFutureProgressListener> progressListeners;
75 private boolean done;
76 private Throwable cause;
77 private int waiters;
78
79
80
81
82
83
84
85
86
87 public DefaultChannelFuture(Channel channel, boolean cancellable) {
88 this.channel = channel;
89 this.cancellable = cancellable;
90 }
91
92 public Channel getChannel() {
93 return channel;
94 }
95
96 public synchronized boolean isDone() {
97 return done;
98 }
99
100 public synchronized boolean isSuccess() {
101 return done && cause == null;
102 }
103
104 public synchronized Throwable getCause() {
105 if (cause != CANCELLED) {
106 return cause;
107 } else {
108 return null;
109 }
110 }
111
112 public synchronized boolean isCancelled() {
113 return cause == CANCELLED;
114 }
115
116 public void addListener(ChannelFutureListener listener) {
117 if (listener == null) {
118 throw new NullPointerException("listener");
119 }
120
121 boolean notifyNow = false;
122 synchronized (this) {
123 if (done) {
124 notifyNow = true;
125 } else {
126 if (firstListener == null) {
127 firstListener = listener;
128 } else {
129 if (otherListeners == null) {
130 otherListeners = new ArrayList<ChannelFutureListener>(1);
131 }
132 otherListeners.add(listener);
133 }
134
135 if (listener instanceof ChannelFutureProgressListener) {
136 if (progressListeners == null) {
137 progressListeners = new ArrayList<ChannelFutureProgressListener>(1);
138 }
139 progressListeners.add((ChannelFutureProgressListener) listener);
140 }
141 }
142 }
143
144 if (notifyNow) {
145 notifyListener(listener);
146 }
147 }
148
149 public void removeListener(ChannelFutureListener listener) {
150 if (listener == null) {
151 throw new NullPointerException("listener");
152 }
153
154 synchronized (this) {
155 if (!done) {
156 if (listener == firstListener) {
157 if (otherListeners != null && !otherListeners.isEmpty()) {
158 firstListener = otherListeners.remove(0);
159 } else {
160 firstListener = null;
161 }
162 } else if (otherListeners != null) {
163 otherListeners.remove(listener);
164 }
165
166 if (listener instanceof ChannelFutureProgressListener) {
167 progressListeners.remove(listener);
168 }
169 }
170 }
171 }
172
173 public ChannelFuture rethrowIfFailed() throws Exception {
174 if (!isDone()) {
175 return this;
176 }
177
178 Throwable cause = getCause();
179 if (cause == null) {
180 return this;
181 }
182
183 if (cause instanceof Exception) {
184 throw (Exception) cause;
185 }
186
187 if (cause instanceof Error) {
188 throw (Error) cause;
189 }
190
191 throw new RuntimeException(cause);
192 }
193
194 public ChannelFuture sync() throws InterruptedException {
195 await();
196 rethrowIfFailed0();
197 return this;
198 }
199
200 public ChannelFuture syncUninterruptibly() {
201 awaitUninterruptibly();
202 rethrowIfFailed0();
203 return this;
204 }
205
206 private void rethrowIfFailed0() {
207 Throwable cause = getCause();
208 if (cause == null) {
209 return;
210 }
211
212 if (cause instanceof RuntimeException) {
213 throw (RuntimeException) cause;
214 }
215
216 if (cause instanceof Error) {
217 throw (Error) cause;
218 }
219
220 throw new ChannelException(cause);
221 }
222
223 public ChannelFuture await() throws InterruptedException {
224 if (Thread.interrupted()) {
225 throw new InterruptedException();
226 }
227
228 synchronized (this) {
229 while (!done) {
230 checkDeadLock();
231 waiters++;
232 try {
233 wait();
234 } finally {
235 waiters--;
236 }
237 }
238 }
239 return this;
240 }
241
242 public boolean await(long timeout, TimeUnit unit)
243 throws InterruptedException {
244 return await0(unit.toNanos(timeout), true);
245 }
246
247 public boolean await(long timeoutMillis) throws InterruptedException {
248 return await0(MILLISECONDS.toNanos(timeoutMillis), true);
249 }
250
251 public ChannelFuture awaitUninterruptibly() {
252 boolean interrupted = false;
253 synchronized (this) {
254 while (!done) {
255 checkDeadLock();
256 waiters++;
257 try {
258 wait();
259 } catch (InterruptedException e) {
260 interrupted = true;
261 } finally {
262 waiters--;
263 }
264 }
265 }
266
267 if (interrupted) {
268 Thread.currentThread().interrupt();
269 }
270
271 return this;
272 }
273
274 public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {
275 try {
276 return await0(unit.toNanos(timeout), false);
277 } catch (InterruptedException e) {
278 throw new InternalError();
279 }
280 }
281
282 public boolean awaitUninterruptibly(long timeoutMillis) {
283 try {
284 return await0(MILLISECONDS.toNanos(timeoutMillis), false);
285 } catch (InterruptedException e) {
286 throw new InternalError();
287 }
288 }
289
290 private boolean await0(long timeoutNanos, boolean interruptable) throws InterruptedException {
291 if (interruptable && Thread.interrupted()) {
292 throw new InterruptedException();
293 }
294
295 long startTime = timeoutNanos <= 0 ? 0 : System.nanoTime();
296 long waitTime = timeoutNanos;
297 boolean interrupted = false;
298
299 try {
300 synchronized (this) {
301 if (done) {
302 return done;
303 } else if (waitTime <= 0) {
304 return done;
305 }
306
307 checkDeadLock();
308 waiters++;
309 try {
310 for (;;) {
311 try {
312 wait(waitTime / 1000000, (int) (waitTime % 1000000));
313 } catch (InterruptedException e) {
314 if (interruptable) {
315 throw e;
316 } else {
317 interrupted = true;
318 }
319 }
320
321 if (done) {
322 return true;
323 } else {
324 waitTime = timeoutNanos - (System.nanoTime() - startTime);
325 if (waitTime <= 0) {
326 return done;
327 }
328 }
329 }
330 } finally {
331 waiters--;
332 }
333 }
334 } finally {
335 if (interrupted) {
336 Thread.currentThread().interrupt();
337 }
338 }
339 }
340
341 private static void checkDeadLock() {
342 if (isUseDeadLockChecker() && DeadLockProofWorker.PARENT.get() != null) {
343 throw new IllegalStateException(
344 "await*() in I/O thread causes a dead lock or " +
345 "sudden performance drop. Use addListener() instead or " +
346 "call await*() from a different thread.");
347 }
348 }
349
350 public boolean setSuccess() {
351 synchronized (this) {
352
353 if (done) {
354 return false;
355 }
356
357 done = true;
358 if (waiters > 0) {
359 notifyAll();
360 }
361 }
362
363 notifyListeners();
364 return true;
365 }
366
367 public boolean setFailure(Throwable cause) {
368 synchronized (this) {
369
370 if (done) {
371 return false;
372 }
373
374 this.cause = cause;
375 done = true;
376 if (waiters > 0) {
377 notifyAll();
378 }
379 }
380
381 notifyListeners();
382 return true;
383 }
384
385 public boolean cancel() {
386 if (!cancellable) {
387 return false;
388 }
389
390 synchronized (this) {
391
392 if (done) {
393 return false;
394 }
395
396 cause = CANCELLED;
397 done = true;
398 if (waiters > 0) {
399 notifyAll();
400 }
401 }
402
403 notifyListeners();
404 return true;
405 }
406
407 private void notifyListeners() {
408
409
410
411
412
413 if (firstListener != null) {
414 notifyListener(firstListener);
415 firstListener = null;
416
417 if (otherListeners != null) {
418 for (ChannelFutureListener l: otherListeners) {
419 notifyListener(l);
420 }
421 otherListeners = null;
422 }
423 }
424 }
425
426 private void notifyListener(ChannelFutureListener l) {
427 try {
428 l.operationComplete(this);
429 } catch (Throwable t) {
430 if (logger.isWarnEnabled()) {
431 logger.warn(
432 "An exception was thrown by " +
433 ChannelFutureListener.class.getSimpleName() + '.', t);
434 }
435 }
436 }
437
438 public boolean setProgress(long amount, long current, long total) {
439 ChannelFutureProgressListener[] plisteners;
440 synchronized (this) {
441
442 if (done) {
443 return false;
444 }
445
446 Collection<ChannelFutureProgressListener> progressListeners =
447 this.progressListeners;
448 if (progressListeners == null || progressListeners.isEmpty()) {
449
450 return true;
451 }
452
453 plisteners = progressListeners.toArray(
454 new ChannelFutureProgressListener[progressListeners.size()]);
455 }
456
457 for (ChannelFutureProgressListener pl: plisteners) {
458 notifyProgressListener(pl, amount, current, total);
459 }
460
461 return true;
462 }
463
464 private void notifyProgressListener(
465 ChannelFutureProgressListener l,
466 long amount, long current, long total) {
467
468 try {
469 l.operationProgressed(this, amount, current, total);
470 } catch (Throwable t) {
471 if (logger.isWarnEnabled()) {
472 logger.warn(
473 "An exception was thrown by " +
474 ChannelFutureProgressListener.class.getSimpleName() + '.', t);
475 }
476 }
477 }
478 }