View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.channel.group;
17  
18  import static java.util.concurrent.TimeUnit.*;
19  
20  import java.util.ArrayList;
21  import java.util.Collection;
22  import java.util.Collections;
23  import java.util.Iterator;
24  import java.util.LinkedHashMap;
25  import java.util.List;
26  import java.util.Map;
27  import java.util.concurrent.TimeUnit;
28  
29  import org.jboss.netty.channel.Channel;
30  import org.jboss.netty.channel.ChannelFuture;
31  import org.jboss.netty.channel.ChannelFutureListener;
32  import org.jboss.netty.logging.InternalLogger;
33  import org.jboss.netty.logging.InternalLoggerFactory;
34  import org.jboss.netty.util.internal.DeadLockProofWorker;
35  
36  /**
37   * The default {@link ChannelGroupFuture} implementation.
38   */
39  public class DefaultChannelGroupFuture implements ChannelGroupFuture {
40  
41      private static final InternalLogger logger =
42          InternalLoggerFactory.getInstance(DefaultChannelGroupFuture.class);
43  
44      private final ChannelGroup group;
45      final Map<Integer, ChannelFuture> futures;
46      private ChannelGroupFutureListener firstListener;
47      private List<ChannelGroupFutureListener> otherListeners;
48      private boolean done;
49      int successCount;
50      int failureCount;
51      private int waiters;
52  
53      private final ChannelFutureListener childListener = new ChannelFutureListener() {
54          public void operationComplete(ChannelFuture future) throws Exception {
55              boolean success = future.isSuccess();
56              boolean callSetDone = false;
57              synchronized (DefaultChannelGroupFuture.this) {
58                  if (success) {
59                      successCount ++;
60                  } else {
61                      failureCount ++;
62                  }
63  
64                  callSetDone = successCount + failureCount == futures.size();
65                  assert successCount + failureCount <= futures.size();
66              }
67  
68              if (callSetDone) {
69                  setDone();
70              }
71          }
72      };
73  
74      /**
75       * Creates a new instance.
76       */
77      public DefaultChannelGroupFuture(ChannelGroup group, Collection<ChannelFuture> futures) {
78          if (group == null) {
79              throw new NullPointerException("group");
80          }
81          if (futures == null) {
82              throw new NullPointerException("futures");
83          }
84  
85          this.group = group;
86  
87          Map<Integer, ChannelFuture> futureMap = new LinkedHashMap<Integer, ChannelFuture>();
88          for (ChannelFuture f: futures) {
89              futureMap.put(f.getChannel().getId(), f);
90          }
91  
92          this.futures = Collections.unmodifiableMap(futureMap);
93  
94          for (ChannelFuture f: this.futures.values()) {
95              f.addListener(childListener);
96          }
97  
98          // Done on arrival?
99          if (this.futures.isEmpty()) {
100             setDone();
101         }
102     }
103 
104     DefaultChannelGroupFuture(ChannelGroup group, Map<Integer, ChannelFuture> futures) {
105         this.group = group;
106         this.futures = Collections.unmodifiableMap(futures);
107         for (ChannelFuture f: this.futures.values()) {
108             f.addListener(childListener);
109         }
110 
111         // Done on arrival?
112         if (this.futures.isEmpty()) {
113             setDone();
114         }
115     }
116 
117     public ChannelGroup getGroup() {
118         return group;
119     }
120 
121     public ChannelFuture find(Integer channelId) {
122         return futures.get(channelId);
123     }
124 
125     public ChannelFuture find(Channel channel) {
126         return futures.get(channel.getId());
127     }
128 
129     public Iterator<ChannelFuture> iterator() {
130         return futures.values().iterator();
131     }
132 
133     public synchronized boolean isDone() {
134         return done;
135     }
136 
137     public synchronized boolean isCompleteSuccess() {
138         return successCount == futures.size();
139     }
140 
141     public synchronized boolean isPartialSuccess() {
142         return successCount != 0 && successCount != futures.size();
143     }
144 
145     public synchronized boolean isPartialFailure() {
146         return failureCount != 0 && failureCount != futures.size();
147     }
148 
149     public synchronized boolean isCompleteFailure() {
150         int futureCnt = futures.size();
151         return futureCnt != 0 && failureCount == futureCnt;
152     }
153 
154     public void addListener(ChannelGroupFutureListener listener) {
155         if (listener == null) {
156             throw new NullPointerException("listener");
157         }
158 
159         boolean notifyNow = false;
160         synchronized (this) {
161             if (done) {
162                 notifyNow = true;
163             } else {
164                 if (firstListener == null) {
165                     firstListener = listener;
166                 } else {
167                     if (otherListeners == null) {
168                         otherListeners = new ArrayList<ChannelGroupFutureListener>(1);
169                     }
170                     otherListeners.add(listener);
171                 }
172             }
173         }
174 
175         if (notifyNow) {
176             notifyListener(listener);
177         }
178     }
179 
180     public void removeListener(ChannelGroupFutureListener listener) {
181         if (listener == null) {
182             throw new NullPointerException("listener");
183         }
184 
185         synchronized (this) {
186             if (!done) {
187                 if (listener == firstListener) {
188                     if (otherListeners != null && !otherListeners.isEmpty()) {
189                         firstListener = otherListeners.remove(0);
190                     } else {
191                         firstListener = null;
192                     }
193                 } else if (otherListeners != null) {
194                     otherListeners.remove(listener);
195                 }
196             }
197         }
198     }
199 
200     public ChannelGroupFuture await() throws InterruptedException {
201         if (Thread.interrupted()) {
202             throw new InterruptedException();
203         }
204 
205         synchronized (this) {
206             while (!done) {
207                 checkDeadLock();
208                 waiters++;
209                 try {
210                     wait();
211                 } finally {
212                     waiters--;
213                 }
214             }
215         }
216         return this;
217     }
218 
219     public boolean await(long timeout, TimeUnit unit)
220             throws InterruptedException {
221         return await0(unit.toNanos(timeout), true);
222     }
223 
224     public boolean await(long timeoutMillis) throws InterruptedException {
225         return await0(MILLISECONDS.toNanos(timeoutMillis), true);
226     }
227 
228     public ChannelGroupFuture awaitUninterruptibly() {
229         boolean interrupted = false;
230         synchronized (this) {
231             while (!done) {
232                 checkDeadLock();
233                 waiters++;
234                 try {
235                     wait();
236                 } catch (InterruptedException e) {
237                     interrupted = true;
238                 } finally {
239                     waiters--;
240                 }
241             }
242         }
243 
244         if (interrupted) {
245             Thread.currentThread().interrupt();
246         }
247 
248         return this;
249     }
250 
251     public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {
252         try {
253             return await0(unit.toNanos(timeout), false);
254         } catch (InterruptedException e) {
255             throw new InternalError();
256         }
257     }
258 
259     public boolean awaitUninterruptibly(long timeoutMillis) {
260         try {
261             return await0(MILLISECONDS.toNanos(timeoutMillis), false);
262         } catch (InterruptedException e) {
263             throw new InternalError();
264         }
265     }
266 
267     private boolean await0(long timeoutNanos, boolean interruptable) throws InterruptedException {
268         if (interruptable && Thread.interrupted()) {
269             throw new InterruptedException();
270         }
271 
272         long startTime = timeoutNanos <= 0 ? 0 : System.nanoTime();
273         long waitTime = timeoutNanos;
274         boolean interrupted = false;
275 
276         try {
277             synchronized (this) {
278                 if (done) {
279                     return done;
280                 } else if (waitTime <= 0) {
281                     return done;
282                 }
283 
284                 checkDeadLock();
285                 waiters++;
286                 try {
287                     for (;;) {
288                         try {
289                             wait(waitTime / 1000000, (int) (waitTime % 1000000));
290                         } catch (InterruptedException e) {
291                             if (interruptable) {
292                                 throw e;
293                             } else {
294                                 interrupted = true;
295                             }
296                         }
297 
298                         if (done) {
299                             return true;
300                         } else {
301                             waitTime = timeoutNanos - (System.nanoTime() - startTime);
302                             if (waitTime <= 0) {
303                                 return done;
304                             }
305                         }
306                     }
307                 } finally {
308                     waiters--;
309                 }
310             }
311         } finally {
312             if (interrupted) {
313                 Thread.currentThread().interrupt();
314             }
315         }
316     }
317 
318     private static void checkDeadLock() {
319         if (DeadLockProofWorker.PARENT.get() != null) {
320             throw new IllegalStateException(
321                     "await*() in I/O thread causes a dead lock or " +
322                     "sudden performance drop. Use addListener() instead or " +
323                     "call await*() from a different thread.");
324         }
325     }
326 
327     boolean setDone() {
328         synchronized (this) {
329             // Allow only once.
330             if (done) {
331                 return false;
332             }
333 
334             done = true;
335             if (waiters > 0) {
336                 notifyAll();
337             }
338         }
339 
340         notifyListeners();
341         return true;
342     }
343 
344     private void notifyListeners() {
345         // This method doesn't need synchronization because:
346         // 1) This method is always called after synchronized (this) block.
347         //    Hence any listener list modification happens-before this method.
348         // 2) This method is called only when 'done' is true.  Once 'done'
349         //    becomes true, the listener list is never modified - see add/removeListener()
350         if (firstListener != null) {
351             notifyListener(firstListener);
352             firstListener = null;
353 
354             if (otherListeners != null) {
355                 for (ChannelGroupFutureListener l: otherListeners) {
356                     notifyListener(l);
357                 }
358                 otherListeners = null;
359             }
360         }
361     }
362 
363     private void notifyListener(ChannelGroupFutureListener l) {
364         try {
365             l.operationComplete(this);
366         } catch (Throwable t) {
367             if (logger.isWarnEnabled()) {
368                 logger.warn(
369                         "An exception was thrown by " +
370                         ChannelFutureListener.class.getSimpleName() + '.', t);
371             }
372         }
373     }
374 }