View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.channel;
17  
18  import org.jboss.netty.logging.InternalLogger;
19  import org.jboss.netty.logging.InternalLoggerFactory;
20  import org.jboss.netty.util.internal.DeadLockProofWorker;
21  
22  import java.util.ArrayList;
23  import java.util.Collection;
24  import java.util.List;
25  import java.util.concurrent.TimeUnit;
26  
27  import static java.util.concurrent.TimeUnit.*;
28  
29  /**
30   * The default {@link ChannelFuture} implementation.  It is recommended to
31   * use {@link Channels#future(Channel)} and {@link Channels#future(Channel, boolean)}
32   * to create a new {@link ChannelFuture} rather than calling the constructor
33   * explicitly.
34   */
35  public class DefaultChannelFuture implements ChannelFuture {
36  
37      private static final InternalLogger logger =
38          InternalLoggerFactory.getInstance(DefaultChannelFuture.class);
39  
40      private static final Throwable CANCELLED = new Throwable();
41  
42      private static volatile boolean useDeadLockChecker = true;
43      private static boolean disabledDeadLockCheckerOnce;
44  
45      /**
46       * Returns {@code true} if and only if the dead lock checker is enabled.
47       */
48      public static boolean isUseDeadLockChecker() {
49          return useDeadLockChecker;
50      }
51  
52      /**
53       * Enables or disables the dead lock checker.  It is not recommended to
54       * disable the dead lock checker.  Disable it at your own risk!
55       */
56      public static void setUseDeadLockChecker(boolean useDeadLockChecker) {
57          if (!useDeadLockChecker && !disabledDeadLockCheckerOnce) {
58              disabledDeadLockCheckerOnce = true;
59              if (logger.isDebugEnabled()) {
60                  logger.debug(
61                          "The dead lock checker in " +
62                          DefaultChannelFuture.class.getSimpleName() +
63                          " has been disabled as requested at your own risk.");
64              }
65          }
66          DefaultChannelFuture.useDeadLockChecker = useDeadLockChecker;
67      }
68  
69      private final Channel channel;
70      private final boolean cancellable;
71  
72      private ChannelFutureListener firstListener;
73      private List<ChannelFutureListener> otherListeners;
74      private List<ChannelFutureProgressListener> progressListeners;
75      private boolean done;
76      private Throwable cause;
77      private int waiters;
78  
79      /**
80       * Creates a new instance.
81       *
82       * @param channel
83       *        the {@link Channel} associated with this future
84       * @param cancellable
85       *        {@code true} if and only if this future can be canceled
86       */
87      public DefaultChannelFuture(Channel channel, boolean cancellable) {
88          this.channel = channel;
89          this.cancellable = cancellable;
90      }
91  
92      public Channel getChannel() {
93          return channel;
94      }
95  
96      public synchronized boolean isDone() {
97          return done;
98      }
99  
100     public synchronized boolean isSuccess() {
101         return done && cause == null;
102     }
103 
104     public synchronized Throwable getCause() {
105         if (cause != CANCELLED) {
106             return cause;
107         } else {
108             return null;
109         }
110     }
111 
112     public synchronized boolean isCancelled() {
113         return cause == CANCELLED;
114     }
115 
116     public void addListener(ChannelFutureListener listener) {
117         if (listener == null) {
118             throw new NullPointerException("listener");
119         }
120 
121         boolean notifyNow = false;
122         synchronized (this) {
123             if (done) {
124                 notifyNow = true;
125             } else {
126                 if (firstListener == null) {
127                     firstListener = listener;
128                 } else {
129                     if (otherListeners == null) {
130                         otherListeners = new ArrayList<ChannelFutureListener>(1);
131                     }
132                     otherListeners.add(listener);
133                 }
134 
135                 if (listener instanceof ChannelFutureProgressListener) {
136                     if (progressListeners == null) {
137                         progressListeners = new ArrayList<ChannelFutureProgressListener>(1);
138                     }
139                     progressListeners.add((ChannelFutureProgressListener) listener);
140                 }
141             }
142         }
143 
144         if (notifyNow) {
145             notifyListener(listener);
146         }
147     }
148 
149     public void removeListener(ChannelFutureListener listener) {
150         if (listener == null) {
151             throw new NullPointerException("listener");
152         }
153 
154         synchronized (this) {
155             if (!done) {
156                 if (listener == firstListener) {
157                     if (otherListeners != null && !otherListeners.isEmpty()) {
158                         firstListener = otherListeners.remove(0);
159                     } else {
160                         firstListener = null;
161                     }
162                 } else if (otherListeners != null) {
163                     otherListeners.remove(listener);
164                 }
165 
166                 if (listener instanceof ChannelFutureProgressListener) {
167                     progressListeners.remove(listener);
168                 }
169             }
170         }
171     }
172 
173     public ChannelFuture rethrowIfFailed() throws Exception {
174         if (!isDone()) {
175             return this;
176         }
177 
178         Throwable cause = getCause();
179         if (cause == null) {
180             return this;
181         }
182 
183         if (cause instanceof Exception) {
184             throw (Exception) cause;
185         }
186 
187         if (cause instanceof Error) {
188             throw (Error) cause;
189         }
190 
191         throw new RuntimeException(cause);
192     }
193 
194     public ChannelFuture sync() throws InterruptedException {
195         await();
196         rethrowIfFailed0();
197         return this;
198     }
199 
200     public ChannelFuture syncUninterruptibly() {
201         awaitUninterruptibly();
202         rethrowIfFailed0();
203         return this;
204     }
205 
206     private void rethrowIfFailed0() {
207         Throwable cause = getCause();
208         if (cause == null) {
209             return;
210         }
211 
212         if (cause instanceof RuntimeException) {
213             throw (RuntimeException) cause;
214         }
215 
216         if (cause instanceof Error) {
217             throw (Error) cause;
218         }
219 
220         throw new ChannelException(cause);
221     }
222 
223     public ChannelFuture await() throws InterruptedException {
224         if (Thread.interrupted()) {
225             throw new InterruptedException();
226         }
227 
228         synchronized (this) {
229             while (!done) {
230                 checkDeadLock();
231                 waiters++;
232                 try {
233                     wait();
234                 } finally {
235                     waiters--;
236                 }
237             }
238         }
239         return this;
240     }
241 
242     public boolean await(long timeout, TimeUnit unit)
243             throws InterruptedException {
244         return await0(unit.toNanos(timeout), true);
245     }
246 
247     public boolean await(long timeoutMillis) throws InterruptedException {
248         return await0(MILLISECONDS.toNanos(timeoutMillis), true);
249     }
250 
251     public ChannelFuture awaitUninterruptibly() {
252         boolean interrupted = false;
253         synchronized (this) {
254             while (!done) {
255                 checkDeadLock();
256                 waiters++;
257                 try {
258                     wait();
259                 } catch (InterruptedException e) {
260                     interrupted = true;
261                 } finally {
262                     waiters--;
263                 }
264             }
265         }
266 
267         if (interrupted) {
268             Thread.currentThread().interrupt();
269         }
270 
271         return this;
272     }
273 
274     public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {
275         try {
276             return await0(unit.toNanos(timeout), false);
277         } catch (InterruptedException e) {
278             throw new InternalError();
279         }
280     }
281 
282     public boolean awaitUninterruptibly(long timeoutMillis) {
283         try {
284             return await0(MILLISECONDS.toNanos(timeoutMillis), false);
285         } catch (InterruptedException e) {
286             throw new InternalError();
287         }
288     }
289 
290     private boolean await0(long timeoutNanos, boolean interruptable) throws InterruptedException {
291         if (interruptable && Thread.interrupted()) {
292             throw new InterruptedException();
293         }
294 
295         long startTime = timeoutNanos <= 0 ? 0 : System.nanoTime();
296         long waitTime = timeoutNanos;
297         boolean interrupted = false;
298 
299         try {
300             synchronized (this) {
301                 if (done) {
302                     return done;
303                 } else if (waitTime <= 0) {
304                     return done;
305                 }
306 
307                 checkDeadLock();
308                 waiters++;
309                 try {
310                     for (;;) {
311                         try {
312                             wait(waitTime / 1000000, (int) (waitTime % 1000000));
313                         } catch (InterruptedException e) {
314                             if (interruptable) {
315                                 throw e;
316                             } else {
317                                 interrupted = true;
318                             }
319                         }
320 
321                         if (done) {
322                             return true;
323                         } else {
324                             waitTime = timeoutNanos - (System.nanoTime() - startTime);
325                             if (waitTime <= 0) {
326                                 return done;
327                             }
328                         }
329                     }
330                 } finally {
331                     waiters--;
332                 }
333             }
334         } finally {
335             if (interrupted) {
336                 Thread.currentThread().interrupt();
337             }
338         }
339     }
340 
341     private static void checkDeadLock() {
342         if (isUseDeadLockChecker() && DeadLockProofWorker.PARENT.get() != null) {
343             throw new IllegalStateException(
344                     "await*() in I/O thread causes a dead lock or " +
345                     "sudden performance drop. Use addListener() instead or " +
346                     "call await*() from a different thread.");
347         }
348     }
349 
350     public boolean setSuccess() {
351         synchronized (this) {
352             // Allow only once.
353             if (done) {
354                 return false;
355             }
356 
357             done = true;
358             if (waiters > 0) {
359                 notifyAll();
360             }
361         }
362 
363         notifyListeners();
364         return true;
365     }
366 
367     public boolean setFailure(Throwable cause) {
368         synchronized (this) {
369             // Allow only once.
370             if (done) {
371                 return false;
372             }
373 
374             this.cause = cause;
375             done = true;
376             if (waiters > 0) {
377                 notifyAll();
378             }
379         }
380 
381         notifyListeners();
382         return true;
383     }
384 
385     public boolean cancel() {
386         if (!cancellable) {
387             return false;
388         }
389 
390         synchronized (this) {
391             // Allow only once.
392             if (done) {
393                 return false;
394             }
395 
396             cause = CANCELLED;
397             done = true;
398             if (waiters > 0) {
399                 notifyAll();
400             }
401         }
402 
403         notifyListeners();
404         return true;
405     }
406 
407     private void notifyListeners() {
408         // This method doesn't need synchronization because:
409         // 1) This method is always called after synchronized (this) block.
410         //    Hence any listener list modification happens-before this method.
411         // 2) This method is called only when 'done' is true.  Once 'done'
412         //    becomes true, the listener list is never modified - see add/removeListener()
413         if (firstListener != null) {
414             notifyListener(firstListener);
415             firstListener = null;
416 
417             if (otherListeners != null) {
418                 for (ChannelFutureListener l: otherListeners) {
419                     notifyListener(l);
420                 }
421                 otherListeners = null;
422             }
423         }
424     }
425 
426     private void notifyListener(ChannelFutureListener l) {
427         try {
428             l.operationComplete(this);
429         } catch (Throwable t) {
430             if (logger.isWarnEnabled()) {
431                 logger.warn(
432                         "An exception was thrown by " +
433                         ChannelFutureListener.class.getSimpleName() + '.', t);
434             }
435         }
436     }
437 
438     public boolean setProgress(long amount, long current, long total) {
439         ChannelFutureProgressListener[] plisteners;
440         synchronized (this) {
441             // Do not generate progress event after completion.
442             if (done) {
443                 return false;
444             }
445 
446             Collection<ChannelFutureProgressListener> progressListeners =
447                 this.progressListeners;
448             if (progressListeners == null || progressListeners.isEmpty()) {
449                 // Nothing to notify - no need to create an empty array.
450                 return true;
451             }
452 
453             plisteners = progressListeners.toArray(
454                     new ChannelFutureProgressListener[progressListeners.size()]);
455         }
456 
457         for (ChannelFutureProgressListener pl: plisteners) {
458             notifyProgressListener(pl, amount, current, total);
459         }
460 
461         return true;
462     }
463 
464     private void notifyProgressListener(
465             ChannelFutureProgressListener l,
466             long amount, long current, long total) {
467 
468         try {
469             l.operationProgressed(this, amount, current, total);
470         } catch (Throwable t) {
471             if (logger.isWarnEnabled()) {
472                 logger.warn(
473                         "An exception was thrown by " +
474                         ChannelFutureProgressListener.class.getSimpleName() + '.', t);
475             }
476         }
477     }
478 }