View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.channel;
17  
18  import org.jboss.netty.logging.InternalLogger;
19  import org.jboss.netty.logging.InternalLoggerFactory;
20  import org.jboss.netty.util.internal.DeadLockProofWorker;
21  
22  import java.util.ArrayList;
23  import java.util.Collection;
24  import java.util.List;
25  import java.util.concurrent.TimeUnit;
26  
27  import static java.util.concurrent.TimeUnit.*;
28  
29  /**
30   * The default {@link ChannelFuture} implementation.  It is recommended to
31   * use {@link Channels#future(Channel)} and {@link Channels#future(Channel, boolean)}
32   * to create a new {@link ChannelFuture} rather than calling the constructor
33   * explicitly.
34   */
35  public class DefaultChannelFuture implements ChannelFuture {
36  
37      private static final InternalLogger logger =
38          InternalLoggerFactory.getInstance(DefaultChannelFuture.class);
39  
40      private static final Throwable CANCELLED = new Throwable();
41  
42      private static volatile boolean useDeadLockChecker = true;
43      private static boolean disabledDeadLockCheckerOnce;
44  
45      /**
46       * Returns {@code true} if and only if the dead lock checker is enabled.
47       */
48      public static boolean isUseDeadLockChecker() {
49          return useDeadLockChecker;
50      }
51  
52      /**
53       * Enables or disables the dead lock checker.  It is not recommended to
54       * disable the dead lock checker.  Disable it at your own risk!
55       */
56      public static void setUseDeadLockChecker(boolean useDeadLockChecker) {
57          if (!useDeadLockChecker && !disabledDeadLockCheckerOnce) {
58              disabledDeadLockCheckerOnce = true;
59              if (logger.isDebugEnabled()) {
60                  logger.debug(
61                          "The dead lock checker in " +
62                          DefaultChannelFuture.class.getSimpleName() +
63                          " has been disabled as requested at your own risk.");
64              }
65          }
66          DefaultChannelFuture.useDeadLockChecker = useDeadLockChecker;
67      }
68  
69      private final Channel channel;
70      private final boolean cancellable;
71  
72      private ChannelFutureListener firstListener;
73      private List<ChannelFutureListener> otherListeners;
74      private List<ChannelFutureProgressListener> progressListeners;
75      private boolean done;
76      private Throwable cause;
77      private int waiters;
78  
79      /**
80       * Creates a new instance.
81       *
82       * @param channel
83       *        the {@link Channel} associated with this future
84       * @param cancellable
85       *        {@code true} if and only if this future can be canceled
86       */
87      public DefaultChannelFuture(Channel channel, boolean cancellable) {
88          this.channel = channel;
89          this.cancellable = cancellable;
90      }
91  
92      public Channel getChannel() {
93          return channel;
94      }
95  
96      public synchronized boolean isDone() {
97          return done;
98      }
99  
100     public synchronized boolean isSuccess() {
101         return done && cause == null;
102     }
103 
104     public synchronized Throwable getCause() {
105         if (cause != CANCELLED) {
106             return cause;
107         } else {
108             return null;
109         }
110     }
111 
112     public synchronized boolean isCancelled() {
113         return cause == CANCELLED;
114     }
115 
116     public void addListener(ChannelFutureListener listener) {
117         if (listener == null) {
118             throw new NullPointerException("listener");
119         }
120 
121         boolean notifyNow = false;
122         synchronized (this) {
123             if (done) {
124                 notifyNow = true;
125             } else {
126                 if (firstListener == null) {
127                     firstListener = listener;
128                 } else {
129                     if (otherListeners == null) {
130                         otherListeners = new ArrayList<ChannelFutureListener>(1);
131                     }
132                     otherListeners.add(listener);
133                 }
134 
135                 if (listener instanceof ChannelFutureProgressListener) {
136                     if (progressListeners == null) {
137                         progressListeners = new ArrayList<ChannelFutureProgressListener>(1);
138                     }
139                     progressListeners.add((ChannelFutureProgressListener) listener);
140                 }
141             }
142         }
143 
144         if (notifyNow) {
145             notifyListener(listener);
146         }
147     }
148 
149     public void removeListener(ChannelFutureListener listener) {
150         if (listener == null) {
151             throw new NullPointerException("listener");
152         }
153 
154         synchronized (this) {
155             if (!done) {
156                 if (listener == firstListener) {
157                     if (otherListeners != null && !otherListeners.isEmpty()) {
158                         firstListener = otherListeners.remove(0);
159                     } else {
160                         firstListener = null;
161                     }
162                 } else if (otherListeners != null) {
163                     otherListeners.remove(listener);
164                 }
165 
166                 if (listener instanceof ChannelFutureProgressListener) {
167                     progressListeners.remove(listener);
168                 }
169             }
170         }
171     }
172 
173     @Deprecated
174     public ChannelFuture rethrowIfFailed() throws Exception {
175         if (!isDone()) {
176             return this;
177         }
178 
179         Throwable cause = getCause();
180         if (cause == null) {
181             return this;
182         }
183 
184         if (cause instanceof Exception) {
185             throw (Exception) cause;
186         }
187 
188         if (cause instanceof Error) {
189             throw (Error) cause;
190         }
191 
192         throw new RuntimeException(cause);
193     }
194 
195     public ChannelFuture sync() throws InterruptedException {
196         await();
197         rethrowIfFailed0();
198         return this;
199     }
200 
201     public ChannelFuture syncUninterruptibly() {
202         awaitUninterruptibly();
203         rethrowIfFailed0();
204         return this;
205     }
206 
207     private void rethrowIfFailed0() {
208         Throwable cause = getCause();
209         if (cause == null) {
210             return;
211         }
212 
213         if (cause instanceof RuntimeException) {
214             throw (RuntimeException) cause;
215         }
216 
217         if (cause instanceof Error) {
218             throw (Error) cause;
219         }
220 
221         throw new ChannelException(cause);
222     }
223 
224     public ChannelFuture await() throws InterruptedException {
225         if (Thread.interrupted()) {
226             throw new InterruptedException();
227         }
228 
229         synchronized (this) {
230             while (!done) {
231                 checkDeadLock();
232                 waiters++;
233                 try {
234                     wait();
235                 } finally {
236                     waiters--;
237                 }
238             }
239         }
240         return this;
241     }
242 
243     public boolean await(long timeout, TimeUnit unit)
244             throws InterruptedException {
245         return await0(unit.toNanos(timeout), true);
246     }
247 
248     public boolean await(long timeoutMillis) throws InterruptedException {
249         return await0(MILLISECONDS.toNanos(timeoutMillis), true);
250     }
251 
252     public ChannelFuture awaitUninterruptibly() {
253         boolean interrupted = false;
254         synchronized (this) {
255             while (!done) {
256                 checkDeadLock();
257                 waiters++;
258                 try {
259                     wait();
260                 } catch (InterruptedException e) {
261                     interrupted = true;
262                 } finally {
263                     waiters--;
264                 }
265             }
266         }
267 
268         if (interrupted) {
269             Thread.currentThread().interrupt();
270         }
271 
272         return this;
273     }
274 
275     public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {
276         try {
277             return await0(unit.toNanos(timeout), false);
278         } catch (InterruptedException e) {
279             throw new InternalError();
280         }
281     }
282 
283     public boolean awaitUninterruptibly(long timeoutMillis) {
284         try {
285             return await0(MILLISECONDS.toNanos(timeoutMillis), false);
286         } catch (InterruptedException e) {
287             throw new InternalError();
288         }
289     }
290 
291     private boolean await0(long timeoutNanos, boolean interruptable) throws InterruptedException {
292         if (interruptable && Thread.interrupted()) {
293             throw new InterruptedException();
294         }
295 
296         long startTime = timeoutNanos <= 0 ? 0 : System.nanoTime();
297         long waitTime = timeoutNanos;
298         boolean interrupted = false;
299 
300         try {
301             synchronized (this) {
302                 if (done || waitTime <= 0) {
303                     return done;
304                 }
305 
306                 checkDeadLock();
307                 waiters++;
308                 try {
309                     for (;;) {
310                         try {
311                             wait(waitTime / 1000000, (int) (waitTime % 1000000));
312                         } catch (InterruptedException e) {
313                             if (interruptable) {
314                                 throw e;
315                             } else {
316                                 interrupted = true;
317                             }
318                         }
319 
320                         if (done) {
321                             return true;
322                         } else {
323                             waitTime = timeoutNanos - (System.nanoTime() - startTime);
324                             if (waitTime <= 0) {
325                                 return done;
326                             }
327                         }
328                     }
329                 } finally {
330                     waiters--;
331                 }
332             }
333         } finally {
334             if (interrupted) {
335                 Thread.currentThread().interrupt();
336             }
337         }
338     }
339 
340     private static void checkDeadLock() {
341         if (isUseDeadLockChecker() && DeadLockProofWorker.PARENT.get() != null) {
342             throw new IllegalStateException(
343                     "await*() in I/O thread causes a dead lock or " +
344                     "sudden performance drop. Use addListener() instead or " +
345                     "call await*() from a different thread.");
346         }
347     }
348 
349     public boolean setSuccess() {
350         synchronized (this) {
351             // Allow only once.
352             if (done) {
353                 return false;
354             }
355 
356             done = true;
357             if (waiters > 0) {
358                 notifyAll();
359             }
360         }
361 
362         notifyListeners();
363         return true;
364     }
365 
366     public boolean setFailure(Throwable cause) {
367         if (cause == null) {
368             throw new NullPointerException("cause");
369         }
370 
371         synchronized (this) {
372             // Allow only once.
373             if (done) {
374                 return false;
375             }
376 
377             this.cause = cause;
378             done = true;
379             if (waiters > 0) {
380                 notifyAll();
381             }
382         }
383 
384         notifyListeners();
385         return true;
386     }
387 
388     public boolean cancel() {
389         if (!cancellable) {
390             return false;
391         }
392 
393         synchronized (this) {
394             // Allow only once.
395             if (done) {
396                 return false;
397             }
398 
399             cause = CANCELLED;
400             done = true;
401             if (waiters > 0) {
402                 notifyAll();
403             }
404         }
405 
406         notifyListeners();
407         return true;
408     }
409 
410     private void notifyListeners() {
411         // This method doesn't need synchronization because:
412         // 1) This method is always called after synchronized (this) block.
413         //    Hence any listener list modification happens-before this method.
414         // 2) This method is called only when 'done' is true.  Once 'done'
415         //    becomes true, the listener list is never modified - see add/removeListener()
416         if (firstListener != null) {
417             notifyListener(firstListener);
418             firstListener = null;
419 
420             if (otherListeners != null) {
421                 for (ChannelFutureListener l: otherListeners) {
422                     notifyListener(l);
423                 }
424                 otherListeners = null;
425             }
426         }
427     }
428 
429     private void notifyListener(ChannelFutureListener l) {
430         try {
431             l.operationComplete(this);
432         } catch (Throwable t) {
433             if (logger.isWarnEnabled()) {
434                 logger.warn(
435                         "An exception was thrown by " +
436                         ChannelFutureListener.class.getSimpleName() + '.', t);
437             }
438         }
439     }
440 
441     public boolean setProgress(long amount, long current, long total) {
442         ChannelFutureProgressListener[] plisteners;
443         synchronized (this) {
444             // Do not generate progress event after completion.
445             if (done) {
446                 return false;
447             }
448 
449             Collection<ChannelFutureProgressListener> progressListeners =
450                 this.progressListeners;
451             if (progressListeners == null || progressListeners.isEmpty()) {
452                 // Nothing to notify - no need to create an empty array.
453                 return true;
454             }
455 
456             plisteners = progressListeners.toArray(
457                     new ChannelFutureProgressListener[progressListeners.size()]);
458         }
459 
460         for (ChannelFutureProgressListener pl: plisteners) {
461             notifyProgressListener(pl, amount, current, total);
462         }
463 
464         return true;
465     }
466 
467     private void notifyProgressListener(
468             ChannelFutureProgressListener l,
469             long amount, long current, long total) {
470 
471         try {
472             l.operationProgressed(this, amount, current, total);
473         } catch (Throwable t) {
474             if (logger.isWarnEnabled()) {
475                 logger.warn(
476                         "An exception was thrown by " +
477                         ChannelFutureProgressListener.class.getSimpleName() + '.', t);
478             }
479         }
480     }
481 }