View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.channel;
17  
18  import org.jboss.netty.logging.InternalLogger;
19  import org.jboss.netty.logging.InternalLoggerFactory;
20  import org.jboss.netty.util.internal.DeadLockProofWorker;
21  
22  import java.util.ArrayList;
23  import java.util.Collection;
24  import java.util.List;
25  import java.util.concurrent.TimeUnit;
26  
27  import static java.util.concurrent.TimeUnit.*;
28  
29  /**
30   * The default {@link ChannelFuture} implementation.  It is recommended to
31   * use {@link Channels#future(Channel)} and {@link Channels#future(Channel, boolean)}
32   * to create a new {@link ChannelFuture} rather than calling the constructor
33   * explicitly.
34   */
35  public class DefaultChannelFuture implements ChannelFuture {
36  
37      private static final InternalLogger logger =
38          InternalLoggerFactory.getInstance(DefaultChannelFuture.class);
39  
40      private static final Throwable CANCELLED = new Throwable();
41  
42      private static volatile boolean useDeadLockChecker = true;
43      private static boolean disabledDeadLockCheckerOnce;
44  
45      /**
46       * Returns {@code true} if and only if the dead lock checker is enabled.
47       */
48      public static boolean isUseDeadLockChecker() {
49          return useDeadLockChecker;
50      }
51  
52      /**
53       * Enables or disables the dead lock checker.  It is not recommended to
54       * disable the dead lock checker.  Disable it at your own risk!
55       */
56      public static void setUseDeadLockChecker(boolean useDeadLockChecker) {
57          if (!useDeadLockChecker && !disabledDeadLockCheckerOnce) {
58              disabledDeadLockCheckerOnce = true;
59              if (logger.isDebugEnabled()) {
60                  logger.debug(
61                          "The dead lock checker in " +
62                          DefaultChannelFuture.class.getSimpleName() +
63                          " has been disabled as requested at your own risk.");
64              }
65          }
66          DefaultChannelFuture.useDeadLockChecker = useDeadLockChecker;
67      }
68  
69      private final Channel channel;
70      private final boolean cancellable;
71  
72      private ChannelFutureListener firstListener;
73      private List<ChannelFutureListener> otherListeners;
74      private List<ChannelFutureProgressListener> progressListeners;
75      private boolean done;
76      private Throwable cause;
77      private int waiters;
78  
79      /**
80       * Creates a new instance.
81       *
82       * @param channel
83       *        the {@link Channel} associated with this future
84       * @param cancellable
85       *        {@code true} if and only if this future can be canceled
86       */
87      public DefaultChannelFuture(Channel channel, boolean cancellable) {
88          this.channel = channel;
89          this.cancellable = cancellable;
90      }
91  
92      public Channel getChannel() {
93          return channel;
94      }
95  
96      public synchronized boolean isDone() {
97          return done;
98      }
99  
100     public synchronized boolean isSuccess() {
101         return done && cause == null;
102     }
103 
104     public synchronized Throwable getCause() {
105         if (cause != CANCELLED) {
106             return cause;
107         } else {
108             return null;
109         }
110     }
111 
112     public synchronized boolean isCancelled() {
113         return cause == CANCELLED;
114     }
115 
116     public void addListener(ChannelFutureListener listener) {
117         if (listener == null) {
118             throw new NullPointerException("listener");
119         }
120 
121         boolean notifyNow = false;
122         synchronized (this) {
123             if (done) {
124                 notifyNow = true;
125             } else {
126                 if (firstListener == null) {
127                     firstListener = listener;
128                 } else {
129                     if (otherListeners == null) {
130                         otherListeners = new ArrayList<ChannelFutureListener>(1);
131                     }
132                     otherListeners.add(listener);
133                 }
134 
135                 if (listener instanceof ChannelFutureProgressListener) {
136                     if (progressListeners == null) {
137                         progressListeners = new ArrayList<ChannelFutureProgressListener>(1);
138                     }
139                     progressListeners.add((ChannelFutureProgressListener) listener);
140                 }
141             }
142         }
143 
144         if (notifyNow) {
145             notifyListener(listener);
146         }
147     }
148 
149     public void removeListener(ChannelFutureListener listener) {
150         if (listener == null) {
151             throw new NullPointerException("listener");
152         }
153 
154         synchronized (this) {
155             if (!done) {
156                 if (listener == firstListener) {
157                     if (otherListeners != null && !otherListeners.isEmpty()) {
158                         firstListener = otherListeners.remove(0);
159                     } else {
160                         firstListener = null;
161                     }
162                 } else if (otherListeners != null) {
163                     otherListeners.remove(listener);
164                 }
165 
166                 if (listener instanceof ChannelFutureProgressListener) {
167                     progressListeners.remove(listener);
168                 }
169             }
170         }
171     }
172 
173     public ChannelFuture sync() throws InterruptedException {
174         await();
175         rethrowIfFailed0();
176         return this;
177     }
178 
179     public ChannelFuture syncUninterruptibly() {
180         awaitUninterruptibly();
181         rethrowIfFailed0();
182         return this;
183     }
184 
185     private void rethrowIfFailed0() {
186         Throwable cause = getCause();
187         if (cause == null) {
188             return;
189         }
190 
191         if (cause instanceof RuntimeException) {
192             throw (RuntimeException) cause;
193         }
194 
195         if (cause instanceof Error) {
196             throw (Error) cause;
197         }
198 
199         throw new ChannelException(cause);
200     }
201 
202     public ChannelFuture await() throws InterruptedException {
203         if (Thread.interrupted()) {
204             throw new InterruptedException();
205         }
206 
207         synchronized (this) {
208             while (!done) {
209                 checkDeadLock();
210                 waiters++;
211                 try {
212                     wait();
213                 } finally {
214                     waiters--;
215                 }
216             }
217         }
218         return this;
219     }
220 
221     public boolean await(long timeout, TimeUnit unit)
222             throws InterruptedException {
223         return await0(unit.toNanos(timeout), true);
224     }
225 
226     public boolean await(long timeoutMillis) throws InterruptedException {
227         return await0(MILLISECONDS.toNanos(timeoutMillis), true);
228     }
229 
230     public ChannelFuture awaitUninterruptibly() {
231         boolean interrupted = false;
232         synchronized (this) {
233             while (!done) {
234                 checkDeadLock();
235                 waiters++;
236                 try {
237                     wait();
238                 } catch (InterruptedException e) {
239                     interrupted = true;
240                 } finally {
241                     waiters--;
242                 }
243             }
244         }
245 
246         if (interrupted) {
247             Thread.currentThread().interrupt();
248         }
249 
250         return this;
251     }
252 
253     public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {
254         try {
255             return await0(unit.toNanos(timeout), false);
256         } catch (InterruptedException e) {
257             throw new InternalError();
258         }
259     }
260 
261     public boolean awaitUninterruptibly(long timeoutMillis) {
262         try {
263             return await0(MILLISECONDS.toNanos(timeoutMillis), false);
264         } catch (InterruptedException e) {
265             throw new InternalError();
266         }
267     }
268 
269     private boolean await0(long timeoutNanos, boolean interruptable) throws InterruptedException {
270         if (interruptable && Thread.interrupted()) {
271             throw new InterruptedException();
272         }
273 
274         long startTime = timeoutNanos <= 0 ? 0 : System.nanoTime();
275         long waitTime = timeoutNanos;
276         boolean interrupted = false;
277 
278         try {
279             synchronized (this) {
280                 if (done || waitTime <= 0) {
281                     return done;
282                 }
283 
284                 checkDeadLock();
285                 waiters++;
286                 try {
287                     for (;;) {
288                         try {
289                             wait(waitTime / 1000000, (int) (waitTime % 1000000));
290                         } catch (InterruptedException e) {
291                             if (interruptable) {
292                                 throw e;
293                             } else {
294                                 interrupted = true;
295                             }
296                         }
297 
298                         if (done) {
299                             return true;
300                         } else {
301                             waitTime = timeoutNanos - (System.nanoTime() - startTime);
302                             if (waitTime <= 0) {
303                                 return done;
304                             }
305                         }
306                     }
307                 } finally {
308                     waiters--;
309                 }
310             }
311         } finally {
312             if (interrupted) {
313                 Thread.currentThread().interrupt();
314             }
315         }
316     }
317 
318     private static void checkDeadLock() {
319         if (isUseDeadLockChecker() && DeadLockProofWorker.PARENT.get() != null) {
320             throw new IllegalStateException(
321                     "await*() in I/O thread causes a dead lock or " +
322                     "sudden performance drop. Use addListener() instead or " +
323                     "call await*() from a different thread.");
324         }
325     }
326 
327     public boolean setSuccess() {
328         synchronized (this) {
329             // Allow only once.
330             if (done) {
331                 return false;
332             }
333 
334             done = true;
335             if (waiters > 0) {
336                 notifyAll();
337             }
338         }
339 
340         notifyListeners();
341         return true;
342     }
343 
344     public boolean setFailure(Throwable cause) {
345         if (cause == null) {
346             throw new NullPointerException("cause");
347         }
348 
349         synchronized (this) {
350             // Allow only once.
351             if (done) {
352                 return false;
353             }
354 
355             this.cause = cause;
356             done = true;
357             if (waiters > 0) {
358                 notifyAll();
359             }
360         }
361 
362         notifyListeners();
363         return true;
364     }
365 
366     public boolean cancel() {
367         if (!cancellable) {
368             return false;
369         }
370 
371         synchronized (this) {
372             // Allow only once.
373             if (done) {
374                 return false;
375             }
376 
377             cause = CANCELLED;
378             done = true;
379             if (waiters > 0) {
380                 notifyAll();
381             }
382         }
383 
384         notifyListeners();
385         return true;
386     }
387 
388     private void notifyListeners() {
389         // This method doesn't need synchronization because:
390         // 1) This method is always called after synchronized (this) block.
391         //    Hence any listener list modification happens-before this method.
392         // 2) This method is called only when 'done' is true.  Once 'done'
393         //    becomes true, the listener list is never modified - see add/removeListener()
394         if (firstListener != null) {
395             notifyListener(firstListener);
396             firstListener = null;
397 
398             if (otherListeners != null) {
399                 for (ChannelFutureListener l: otherListeners) {
400                     notifyListener(l);
401                 }
402                 otherListeners = null;
403             }
404         }
405     }
406 
407     private void notifyListener(ChannelFutureListener l) {
408         try {
409             l.operationComplete(this);
410         } catch (Throwable t) {
411             if (logger.isWarnEnabled()) {
412                 logger.warn(
413                         "An exception was thrown by " +
414                         ChannelFutureListener.class.getSimpleName() + '.', t);
415             }
416         }
417     }
418 
419     public boolean setProgress(long amount, long current, long total) {
420         ChannelFutureProgressListener[] plisteners;
421         synchronized (this) {
422             // Do not generate progress event after completion.
423             if (done) {
424                 return false;
425             }
426 
427             Collection<ChannelFutureProgressListener> progressListeners =
428                 this.progressListeners;
429             if (progressListeners == null || progressListeners.isEmpty()) {
430                 // Nothing to notify - no need to create an empty array.
431                 return true;
432             }
433 
434             plisteners = progressListeners.toArray(
435                     new ChannelFutureProgressListener[progressListeners.size()]);
436         }
437 
438         for (ChannelFutureProgressListener pl: plisteners) {
439             notifyProgressListener(pl, amount, current, total);
440         }
441 
442         return true;
443     }
444 
445     private void notifyProgressListener(
446             ChannelFutureProgressListener l,
447             long amount, long current, long total) {
448 
449         try {
450             l.operationProgressed(this, amount, current, total);
451         } catch (Throwable t) {
452             if (logger.isWarnEnabled()) {
453                 logger.warn(
454                         "An exception was thrown by " +
455                         ChannelFutureProgressListener.class.getSimpleName() + '.', t);
456             }
457         }
458     }
459 }