1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.channel;
17
18 import org.jboss.netty.logging.InternalLogger;
19 import org.jboss.netty.logging.InternalLoggerFactory;
20 import org.jboss.netty.util.internal.DeadLockProofWorker;
21
22 import java.util.ArrayList;
23 import java.util.Collection;
24 import java.util.List;
25 import java.util.concurrent.TimeUnit;
26
27 import static java.util.concurrent.TimeUnit.*;
28
29
30
31
32
33
34
35 public class DefaultChannelFuture implements ChannelFuture {
36
37 private static final InternalLogger logger =
38 InternalLoggerFactory.getInstance(DefaultChannelFuture.class);
39
40 private static final Throwable CANCELLED = new Throwable();
41
42 private static volatile boolean useDeadLockChecker = true;
43 private static boolean disabledDeadLockCheckerOnce;
44
45
46
47
48 public static boolean isUseDeadLockChecker() {
49 return useDeadLockChecker;
50 }
51
52
53
54
55
56 public static void setUseDeadLockChecker(boolean useDeadLockChecker) {
57 if (!useDeadLockChecker && !disabledDeadLockCheckerOnce) {
58 disabledDeadLockCheckerOnce = true;
59 if (logger.isDebugEnabled()) {
60 logger.debug(
61 "The dead lock checker in " +
62 DefaultChannelFuture.class.getSimpleName() +
63 " has been disabled as requested at your own risk.");
64 }
65 }
66 DefaultChannelFuture.useDeadLockChecker = useDeadLockChecker;
67 }
68
69 private final Channel channel;
70 private final boolean cancellable;
71
72 private ChannelFutureListener firstListener;
73 private List<ChannelFutureListener> otherListeners;
74 private List<ChannelFutureProgressListener> progressListeners;
75 private boolean done;
76 private Throwable cause;
77 private int waiters;
78
79
80
81
82
83
84
85
86
87 public DefaultChannelFuture(Channel channel, boolean cancellable) {
88 this.channel = channel;
89 this.cancellable = cancellable;
90 }
91
92 public Channel getChannel() {
93 return channel;
94 }
95
96 public synchronized boolean isDone() {
97 return done;
98 }
99
100 public synchronized boolean isSuccess() {
101 return done && cause == null;
102 }
103
104 public synchronized Throwable getCause() {
105 if (cause != CANCELLED) {
106 return cause;
107 } else {
108 return null;
109 }
110 }
111
112 public synchronized boolean isCancelled() {
113 return cause == CANCELLED;
114 }
115
116 public void addListener(ChannelFutureListener listener) {
117 if (listener == null) {
118 throw new NullPointerException("listener");
119 }
120
121 boolean notifyNow = false;
122 synchronized (this) {
123 if (done) {
124 notifyNow = true;
125 } else {
126 if (firstListener == null) {
127 firstListener = listener;
128 } else {
129 if (otherListeners == null) {
130 otherListeners = new ArrayList<ChannelFutureListener>(1);
131 }
132 otherListeners.add(listener);
133 }
134
135 if (listener instanceof ChannelFutureProgressListener) {
136 if (progressListeners == null) {
137 progressListeners = new ArrayList<ChannelFutureProgressListener>(1);
138 }
139 progressListeners.add((ChannelFutureProgressListener) listener);
140 }
141 }
142 }
143
144 if (notifyNow) {
145 notifyListener(listener);
146 }
147 }
148
149 public void removeListener(ChannelFutureListener listener) {
150 if (listener == null) {
151 throw new NullPointerException("listener");
152 }
153
154 synchronized (this) {
155 if (!done) {
156 if (listener == firstListener) {
157 if (otherListeners != null && !otherListeners.isEmpty()) {
158 firstListener = otherListeners.remove(0);
159 } else {
160 firstListener = null;
161 }
162 } else if (otherListeners != null) {
163 otherListeners.remove(listener);
164 }
165
166 if (listener instanceof ChannelFutureProgressListener) {
167 progressListeners.remove(listener);
168 }
169 }
170 }
171 }
172
173 public ChannelFuture sync() throws InterruptedException {
174 await();
175 rethrowIfFailed0();
176 return this;
177 }
178
179 public ChannelFuture syncUninterruptibly() {
180 awaitUninterruptibly();
181 rethrowIfFailed0();
182 return this;
183 }
184
185 private void rethrowIfFailed0() {
186 Throwable cause = getCause();
187 if (cause == null) {
188 return;
189 }
190
191 if (cause instanceof RuntimeException) {
192 throw (RuntimeException) cause;
193 }
194
195 if (cause instanceof Error) {
196 throw (Error) cause;
197 }
198
199 throw new ChannelException(cause);
200 }
201
202 public ChannelFuture await() throws InterruptedException {
203 if (Thread.interrupted()) {
204 throw new InterruptedException();
205 }
206
207 synchronized (this) {
208 while (!done) {
209 checkDeadLock();
210 waiters++;
211 try {
212 wait();
213 } finally {
214 waiters--;
215 }
216 }
217 }
218 return this;
219 }
220
221 public boolean await(long timeout, TimeUnit unit)
222 throws InterruptedException {
223 return await0(unit.toNanos(timeout), true);
224 }
225
226 public boolean await(long timeoutMillis) throws InterruptedException {
227 return await0(MILLISECONDS.toNanos(timeoutMillis), true);
228 }
229
230 public ChannelFuture awaitUninterruptibly() {
231 boolean interrupted = false;
232 synchronized (this) {
233 while (!done) {
234 checkDeadLock();
235 waiters++;
236 try {
237 wait();
238 } catch (InterruptedException e) {
239 interrupted = true;
240 } finally {
241 waiters--;
242 }
243 }
244 }
245
246 if (interrupted) {
247 Thread.currentThread().interrupt();
248 }
249
250 return this;
251 }
252
253 public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {
254 try {
255 return await0(unit.toNanos(timeout), false);
256 } catch (InterruptedException e) {
257 throw new InternalError();
258 }
259 }
260
261 public boolean awaitUninterruptibly(long timeoutMillis) {
262 try {
263 return await0(MILLISECONDS.toNanos(timeoutMillis), false);
264 } catch (InterruptedException e) {
265 throw new InternalError();
266 }
267 }
268
269 private boolean await0(long timeoutNanos, boolean interruptable) throws InterruptedException {
270 if (interruptable && Thread.interrupted()) {
271 throw new InterruptedException();
272 }
273
274 long startTime = timeoutNanos <= 0 ? 0 : System.nanoTime();
275 long waitTime = timeoutNanos;
276 boolean interrupted = false;
277
278 try {
279 synchronized (this) {
280 if (done || waitTime <= 0) {
281 return done;
282 }
283
284 checkDeadLock();
285 waiters++;
286 try {
287 for (;;) {
288 try {
289 wait(waitTime / 1000000, (int) (waitTime % 1000000));
290 } catch (InterruptedException e) {
291 if (interruptable) {
292 throw e;
293 } else {
294 interrupted = true;
295 }
296 }
297
298 if (done) {
299 return true;
300 } else {
301 waitTime = timeoutNanos - (System.nanoTime() - startTime);
302 if (waitTime <= 0) {
303 return done;
304 }
305 }
306 }
307 } finally {
308 waiters--;
309 }
310 }
311 } finally {
312 if (interrupted) {
313 Thread.currentThread().interrupt();
314 }
315 }
316 }
317
318 private static void checkDeadLock() {
319 if (isUseDeadLockChecker() && DeadLockProofWorker.PARENT.get() != null) {
320 throw new IllegalStateException(
321 "await*() in I/O thread causes a dead lock or " +
322 "sudden performance drop. Use addListener() instead or " +
323 "call await*() from a different thread.");
324 }
325 }
326
327 public boolean setSuccess() {
328 synchronized (this) {
329
330 if (done) {
331 return false;
332 }
333
334 done = true;
335 if (waiters > 0) {
336 notifyAll();
337 }
338 }
339
340 notifyListeners();
341 return true;
342 }
343
344 public boolean setFailure(Throwable cause) {
345 if (cause == null) {
346 throw new NullPointerException("cause");
347 }
348
349 synchronized (this) {
350
351 if (done) {
352 return false;
353 }
354
355 this.cause = cause;
356 done = true;
357 if (waiters > 0) {
358 notifyAll();
359 }
360 }
361
362 notifyListeners();
363 return true;
364 }
365
366 public boolean cancel() {
367 if (!cancellable) {
368 return false;
369 }
370
371 synchronized (this) {
372
373 if (done) {
374 return false;
375 }
376
377 cause = CANCELLED;
378 done = true;
379 if (waiters > 0) {
380 notifyAll();
381 }
382 }
383
384 notifyListeners();
385 return true;
386 }
387
388 private void notifyListeners() {
389
390
391
392
393
394 if (firstListener != null) {
395 notifyListener(firstListener);
396 firstListener = null;
397
398 if (otherListeners != null) {
399 for (ChannelFutureListener l: otherListeners) {
400 notifyListener(l);
401 }
402 otherListeners = null;
403 }
404 }
405 }
406
407 private void notifyListener(ChannelFutureListener l) {
408 try {
409 l.operationComplete(this);
410 } catch (Throwable t) {
411 if (logger.isWarnEnabled()) {
412 logger.warn(
413 "An exception was thrown by " +
414 ChannelFutureListener.class.getSimpleName() + '.', t);
415 }
416 }
417 }
418
419 public boolean setProgress(long amount, long current, long total) {
420 ChannelFutureProgressListener[] plisteners;
421 synchronized (this) {
422
423 if (done) {
424 return false;
425 }
426
427 Collection<ChannelFutureProgressListener> progressListeners =
428 this.progressListeners;
429 if (progressListeners == null || progressListeners.isEmpty()) {
430
431 return true;
432 }
433
434 plisteners = progressListeners.toArray(
435 new ChannelFutureProgressListener[progressListeners.size()]);
436 }
437
438 for (ChannelFutureProgressListener pl: plisteners) {
439 notifyProgressListener(pl, amount, current, total);
440 }
441
442 return true;
443 }
444
445 private void notifyProgressListener(
446 ChannelFutureProgressListener l,
447 long amount, long current, long total) {
448
449 try {
450 l.operationProgressed(this, amount, current, total);
451 } catch (Throwable t) {
452 if (logger.isWarnEnabled()) {
453 logger.warn(
454 "An exception was thrown by " +
455 ChannelFutureProgressListener.class.getSimpleName() + '.', t);
456 }
457 }
458 }
459 }