1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.channel;
17
18 import org.jboss.netty.logging.InternalLogger;
19 import org.jboss.netty.logging.InternalLoggerFactory;
20 import org.jboss.netty.util.internal.DeadLockProofWorker;
21
22 import java.util.ArrayList;
23 import java.util.Collection;
24 import java.util.List;
25 import java.util.concurrent.TimeUnit;
26
27 import static java.util.concurrent.TimeUnit.*;
28
29
30
31
32
33
34
35 public class DefaultChannelFuture implements ChannelFuture {
36
37 private static final InternalLogger logger =
38 InternalLoggerFactory.getInstance(DefaultChannelFuture.class);
39
40 private static final Throwable CANCELLED = new Throwable();
41
42 private static volatile boolean useDeadLockChecker = true;
43 private static boolean disabledDeadLockCheckerOnce;
44
45
46
47
48 public static boolean isUseDeadLockChecker() {
49 return useDeadLockChecker;
50 }
51
52
53
54
55
56 public static void setUseDeadLockChecker(boolean useDeadLockChecker) {
57 if (!useDeadLockChecker && !disabledDeadLockCheckerOnce) {
58 disabledDeadLockCheckerOnce = true;
59 if (logger.isDebugEnabled()) {
60 logger.debug(
61 "The dead lock checker in " +
62 DefaultChannelFuture.class.getSimpleName() +
63 " has been disabled as requested at your own risk.");
64 }
65 }
66 DefaultChannelFuture.useDeadLockChecker = useDeadLockChecker;
67 }
68
69 private final Channel channel;
70 private final boolean cancellable;
71
72 private ChannelFutureListener firstListener;
73 private List<ChannelFutureListener> otherListeners;
74 private List<ChannelFutureProgressListener> progressListeners;
75 private boolean done;
76 private Throwable cause;
77 private int waiters;
78
79
80
81
82
83
84
85
86
87 public DefaultChannelFuture(Channel channel, boolean cancellable) {
88 this.channel = channel;
89 this.cancellable = cancellable;
90 }
91
92 public Channel getChannel() {
93 return channel;
94 }
95
96 public synchronized boolean isDone() {
97 return done;
98 }
99
100 public synchronized boolean isSuccess() {
101 return done && cause == null;
102 }
103
104 public synchronized Throwable getCause() {
105 if (cause != CANCELLED) {
106 return cause;
107 } else {
108 return null;
109 }
110 }
111
112 public synchronized boolean isCancelled() {
113 return cause == CANCELLED;
114 }
115
116 public void addListener(ChannelFutureListener listener) {
117 if (listener == null) {
118 throw new NullPointerException("listener");
119 }
120
121 boolean notifyNow = false;
122 synchronized (this) {
123 if (done) {
124 notifyNow = true;
125 } else {
126 if (firstListener == null) {
127 firstListener = listener;
128 } else {
129 if (otherListeners == null) {
130 otherListeners = new ArrayList<ChannelFutureListener>(1);
131 }
132 otherListeners.add(listener);
133 }
134
135 if (listener instanceof ChannelFutureProgressListener) {
136 if (progressListeners == null) {
137 progressListeners = new ArrayList<ChannelFutureProgressListener>(1);
138 }
139 progressListeners.add((ChannelFutureProgressListener) listener);
140 }
141 }
142 }
143
144 if (notifyNow) {
145 notifyListener(listener);
146 }
147 }
148
149 public void removeListener(ChannelFutureListener listener) {
150 if (listener == null) {
151 throw new NullPointerException("listener");
152 }
153
154 synchronized (this) {
155 if (!done) {
156 if (listener == firstListener) {
157 if (otherListeners != null && !otherListeners.isEmpty()) {
158 firstListener = otherListeners.remove(0);
159 } else {
160 firstListener = null;
161 }
162 } else if (otherListeners != null) {
163 otherListeners.remove(listener);
164 }
165
166 if (listener instanceof ChannelFutureProgressListener) {
167 progressListeners.remove(listener);
168 }
169 }
170 }
171 }
172
173 @Deprecated
174 public ChannelFuture rethrowIfFailed() throws Exception {
175 if (!isDone()) {
176 return this;
177 }
178
179 Throwable cause = getCause();
180 if (cause == null) {
181 return this;
182 }
183
184 if (cause instanceof Exception) {
185 throw (Exception) cause;
186 }
187
188 if (cause instanceof Error) {
189 throw (Error) cause;
190 }
191
192 throw new RuntimeException(cause);
193 }
194
195 public ChannelFuture sync() throws InterruptedException {
196 await();
197 rethrowIfFailed0();
198 return this;
199 }
200
201 public ChannelFuture syncUninterruptibly() {
202 awaitUninterruptibly();
203 rethrowIfFailed0();
204 return this;
205 }
206
207 private void rethrowIfFailed0() {
208 Throwable cause = getCause();
209 if (cause == null) {
210 return;
211 }
212
213 if (cause instanceof RuntimeException) {
214 throw (RuntimeException) cause;
215 }
216
217 if (cause instanceof Error) {
218 throw (Error) cause;
219 }
220
221 throw new ChannelException(cause);
222 }
223
224 public ChannelFuture await() throws InterruptedException {
225 if (Thread.interrupted()) {
226 throw new InterruptedException();
227 }
228
229 synchronized (this) {
230 while (!done) {
231 checkDeadLock();
232 waiters++;
233 try {
234 wait();
235 } finally {
236 waiters--;
237 }
238 }
239 }
240 return this;
241 }
242
243 public boolean await(long timeout, TimeUnit unit)
244 throws InterruptedException {
245 return await0(unit.toNanos(timeout), true);
246 }
247
248 public boolean await(long timeoutMillis) throws InterruptedException {
249 return await0(MILLISECONDS.toNanos(timeoutMillis), true);
250 }
251
252 public ChannelFuture awaitUninterruptibly() {
253 boolean interrupted = false;
254 synchronized (this) {
255 while (!done) {
256 checkDeadLock();
257 waiters++;
258 try {
259 wait();
260 } catch (InterruptedException e) {
261 interrupted = true;
262 } finally {
263 waiters--;
264 }
265 }
266 }
267
268 if (interrupted) {
269 Thread.currentThread().interrupt();
270 }
271
272 return this;
273 }
274
275 public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {
276 try {
277 return await0(unit.toNanos(timeout), false);
278 } catch (InterruptedException e) {
279 throw new InternalError();
280 }
281 }
282
283 public boolean awaitUninterruptibly(long timeoutMillis) {
284 try {
285 return await0(MILLISECONDS.toNanos(timeoutMillis), false);
286 } catch (InterruptedException e) {
287 throw new InternalError();
288 }
289 }
290
291 private boolean await0(long timeoutNanos, boolean interruptable) throws InterruptedException {
292 if (interruptable && Thread.interrupted()) {
293 throw new InterruptedException();
294 }
295
296 long startTime = timeoutNanos <= 0 ? 0 : System.nanoTime();
297 long waitTime = timeoutNanos;
298 boolean interrupted = false;
299
300 try {
301 synchronized (this) {
302 if (done || waitTime <= 0) {
303 return done;
304 }
305
306 checkDeadLock();
307 waiters++;
308 try {
309 for (;;) {
310 try {
311 wait(waitTime / 1000000, (int) (waitTime % 1000000));
312 } catch (InterruptedException e) {
313 if (interruptable) {
314 throw e;
315 } else {
316 interrupted = true;
317 }
318 }
319
320 if (done) {
321 return true;
322 } else {
323 waitTime = timeoutNanos - (System.nanoTime() - startTime);
324 if (waitTime <= 0) {
325 return done;
326 }
327 }
328 }
329 } finally {
330 waiters--;
331 }
332 }
333 } finally {
334 if (interrupted) {
335 Thread.currentThread().interrupt();
336 }
337 }
338 }
339
340 private static void checkDeadLock() {
341 if (isUseDeadLockChecker() && DeadLockProofWorker.PARENT.get() != null) {
342 throw new IllegalStateException(
343 "await*() in I/O thread causes a dead lock or " +
344 "sudden performance drop. Use addListener() instead or " +
345 "call await*() from a different thread.");
346 }
347 }
348
349 public boolean setSuccess() {
350 synchronized (this) {
351
352 if (done) {
353 return false;
354 }
355
356 done = true;
357 if (waiters > 0) {
358 notifyAll();
359 }
360 }
361
362 notifyListeners();
363 return true;
364 }
365
366 public boolean setFailure(Throwable cause) {
367 synchronized (this) {
368
369 if (done) {
370 return false;
371 }
372
373 this.cause = cause;
374 done = true;
375 if (waiters > 0) {
376 notifyAll();
377 }
378 }
379
380 notifyListeners();
381 return true;
382 }
383
384 public boolean cancel() {
385 if (!cancellable) {
386 return false;
387 }
388
389 synchronized (this) {
390
391 if (done) {
392 return false;
393 }
394
395 cause = CANCELLED;
396 done = true;
397 if (waiters > 0) {
398 notifyAll();
399 }
400 }
401
402 notifyListeners();
403 return true;
404 }
405
406 private void notifyListeners() {
407
408
409
410
411
412 if (firstListener != null) {
413 notifyListener(firstListener);
414 firstListener = null;
415
416 if (otherListeners != null) {
417 for (ChannelFutureListener l: otherListeners) {
418 notifyListener(l);
419 }
420 otherListeners = null;
421 }
422 }
423 }
424
425 private void notifyListener(ChannelFutureListener l) {
426 try {
427 l.operationComplete(this);
428 } catch (Throwable t) {
429 if (logger.isWarnEnabled()) {
430 logger.warn(
431 "An exception was thrown by " +
432 ChannelFutureListener.class.getSimpleName() + '.', t);
433 }
434 }
435 }
436
437 public boolean setProgress(long amount, long current, long total) {
438 ChannelFutureProgressListener[] plisteners;
439 synchronized (this) {
440
441 if (done) {
442 return false;
443 }
444
445 Collection<ChannelFutureProgressListener> progressListeners =
446 this.progressListeners;
447 if (progressListeners == null || progressListeners.isEmpty()) {
448
449 return true;
450 }
451
452 plisteners = progressListeners.toArray(
453 new ChannelFutureProgressListener[progressListeners.size()]);
454 }
455
456 for (ChannelFutureProgressListener pl: plisteners) {
457 notifyProgressListener(pl, amount, current, total);
458 }
459
460 return true;
461 }
462
463 private void notifyProgressListener(
464 ChannelFutureProgressListener l,
465 long amount, long current, long total) {
466
467 try {
468 l.operationProgressed(this, amount, current, total);
469 } catch (Throwable t) {
470 if (logger.isWarnEnabled()) {
471 logger.warn(
472 "An exception was thrown by " +
473 ChannelFutureProgressListener.class.getSimpleName() + '.', t);
474 }
475 }
476 }
477 }