View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.channel.group;
17  
18  import org.jboss.netty.channel.Channel;
19  import org.jboss.netty.channel.ChannelFuture;
20  import org.jboss.netty.channel.ChannelFutureListener;
21  import org.jboss.netty.logging.InternalLogger;
22  import org.jboss.netty.logging.InternalLoggerFactory;
23  import org.jboss.netty.util.internal.DeadLockProofWorker;
24  
25  import java.util.ArrayList;
26  import java.util.Collection;
27  import java.util.Collections;
28  import java.util.Iterator;
29  import java.util.LinkedHashMap;
30  import java.util.List;
31  import java.util.Map;
32  import java.util.concurrent.TimeUnit;
33  
34  import static java.util.concurrent.TimeUnit.*;
35  
36  /**
37   * The default {@link ChannelGroupFuture} implementation.
38   */
39  public class DefaultChannelGroupFuture implements ChannelGroupFuture {
40  
41      private static final InternalLogger logger =
42          InternalLoggerFactory.getInstance(DefaultChannelGroupFuture.class);
43  
44      private final ChannelGroup group;
45      final Map<Integer, ChannelFuture> futures;
46      private ChannelGroupFutureListener firstListener;
47      private List<ChannelGroupFutureListener> otherListeners;
48      private boolean done;
49      int successCount;
50      int failureCount;
51      private int waiters;
52  
53      private final ChannelFutureListener childListener = new ChannelFutureListener() {
54          public void operationComplete(ChannelFuture future) throws Exception {
55              boolean success = future.isSuccess();
56              boolean callSetDone;
57              synchronized (DefaultChannelGroupFuture.this) {
58                  if (success) {
59                      successCount ++;
60                  } else {
61                      failureCount ++;
62                  }
63  
64                  callSetDone = successCount + failureCount == futures.size();
65                  assert successCount + failureCount <= futures.size();
66              }
67  
68              if (callSetDone) {
69                  setDone();
70              }
71          }
72      };
73  
74      /**
75       * Creates a new instance.
76       */
77      public DefaultChannelGroupFuture(ChannelGroup group, Collection<ChannelFuture> futures) {
78          if (group == null) {
79              throw new NullPointerException("group");
80          }
81          if (futures == null) {
82              throw new NullPointerException("futures");
83          }
84  
85          this.group = group;
86  
87          Map<Integer, ChannelFuture> futureMap = new LinkedHashMap<Integer, ChannelFuture>();
88          for (ChannelFuture f: futures) {
89              futureMap.put(f.getChannel().getId(), f);
90          }
91  
92          this.futures = Collections.unmodifiableMap(futureMap);
93  
94          for (ChannelFuture f: this.futures.values()) {
95              f.addListener(childListener);
96          }
97  
98          // Done on arrival?
99          if (this.futures.isEmpty()) {
100             setDone();
101         }
102     }
103 
104     DefaultChannelGroupFuture(ChannelGroup group, Map<Integer, ChannelFuture> futures) {
105         this.group = group;
106         this.futures = Collections.unmodifiableMap(futures);
107         for (ChannelFuture f: this.futures.values()) {
108             f.addListener(childListener);
109         }
110 
111         // Done on arrival?
112         if (this.futures.isEmpty()) {
113             setDone();
114         }
115     }
116 
117     public ChannelGroup getGroup() {
118         return group;
119     }
120 
121     public ChannelFuture find(Integer channelId) {
122         return futures.get(channelId);
123     }
124 
125     public ChannelFuture find(Channel channel) {
126         return futures.get(channel.getId());
127     }
128 
129     public Iterator<ChannelFuture> iterator() {
130         return futures.values().iterator();
131     }
132 
133     public synchronized boolean isDone() {
134         return done;
135     }
136 
137     public synchronized boolean isCompleteSuccess() {
138         return successCount == futures.size();
139     }
140 
141     public synchronized boolean isPartialSuccess() {
142         return successCount != 0 && successCount != futures.size();
143     }
144 
145     public synchronized boolean isPartialFailure() {
146         return failureCount != 0 && failureCount != futures.size();
147     }
148 
149     public synchronized boolean isCompleteFailure() {
150         int futureCnt = futures.size();
151         return futureCnt != 0 && failureCount == futureCnt;
152     }
153 
154     public void addListener(ChannelGroupFutureListener listener) {
155         if (listener == null) {
156             throw new NullPointerException("listener");
157         }
158 
159         boolean notifyNow = false;
160         synchronized (this) {
161             if (done) {
162                 notifyNow = true;
163             } else {
164                 if (firstListener == null) {
165                     firstListener = listener;
166                 } else {
167                     if (otherListeners == null) {
168                         otherListeners = new ArrayList<ChannelGroupFutureListener>(1);
169                     }
170                     otherListeners.add(listener);
171                 }
172             }
173         }
174 
175         if (notifyNow) {
176             notifyListener(listener);
177         }
178     }
179 
180     public void removeListener(ChannelGroupFutureListener listener) {
181         if (listener == null) {
182             throw new NullPointerException("listener");
183         }
184 
185         synchronized (this) {
186             if (!done) {
187                 if (listener == firstListener) {
188                     if (otherListeners != null && !otherListeners.isEmpty()) {
189                         firstListener = otherListeners.remove(0);
190                     } else {
191                         firstListener = null;
192                     }
193                 } else if (otherListeners != null) {
194                     otherListeners.remove(listener);
195                 }
196             }
197         }
198     }
199 
200     public ChannelGroupFuture await() throws InterruptedException {
201         if (Thread.interrupted()) {
202             throw new InterruptedException();
203         }
204 
205         synchronized (this) {
206             while (!done) {
207                 checkDeadLock();
208                 waiters++;
209                 try {
210                     wait();
211                 } finally {
212                     waiters--;
213                 }
214             }
215         }
216         return this;
217     }
218 
219     public boolean await(long timeout, TimeUnit unit)
220             throws InterruptedException {
221         return await0(unit.toNanos(timeout), true);
222     }
223 
224     public boolean await(long timeoutMillis) throws InterruptedException {
225         return await0(MILLISECONDS.toNanos(timeoutMillis), true);
226     }
227 
228     public ChannelGroupFuture awaitUninterruptibly() {
229         boolean interrupted = false;
230         synchronized (this) {
231             while (!done) {
232                 checkDeadLock();
233                 waiters++;
234                 try {
235                     wait();
236                 } catch (InterruptedException e) {
237                     interrupted = true;
238                 } finally {
239                     waiters--;
240                 }
241             }
242         }
243 
244         if (interrupted) {
245             Thread.currentThread().interrupt();
246         }
247 
248         return this;
249     }
250 
251     public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {
252         try {
253             return await0(unit.toNanos(timeout), false);
254         } catch (InterruptedException e) {
255             throw new InternalError();
256         }
257     }
258 
259     public boolean awaitUninterruptibly(long timeoutMillis) {
260         try {
261             return await0(MILLISECONDS.toNanos(timeoutMillis), false);
262         } catch (InterruptedException e) {
263             throw new InternalError();
264         }
265     }
266 
267     private boolean await0(long timeoutNanos, boolean interruptable) throws InterruptedException {
268         if (interruptable && Thread.interrupted()) {
269             throw new InterruptedException();
270         }
271 
272         long startTime = timeoutNanos <= 0 ? 0 : System.nanoTime();
273         long waitTime = timeoutNanos;
274         boolean interrupted = false;
275 
276         try {
277             synchronized (this) {
278                 if (done || waitTime <= 0) {
279                     return done;
280                 }
281 
282                 checkDeadLock();
283                 waiters++;
284                 try {
285                     for (;;) {
286                         try {
287                             wait(waitTime / 1000000, (int) (waitTime % 1000000));
288                         } catch (InterruptedException e) {
289                             if (interruptable) {
290                                 throw e;
291                             } else {
292                                 interrupted = true;
293                             }
294                         }
295 
296                         if (done) {
297                             return true;
298                         } else {
299                             waitTime = timeoutNanos - (System.nanoTime() - startTime);
300                             if (waitTime <= 0) {
301                                 return done;
302                             }
303                         }
304                     }
305                 } finally {
306                     waiters--;
307                 }
308             }
309         } finally {
310             if (interrupted) {
311                 Thread.currentThread().interrupt();
312             }
313         }
314     }
315 
316     private static void checkDeadLock() {
317         if (DeadLockProofWorker.PARENT.get() != null) {
318             throw new IllegalStateException(
319                     "await*() in I/O thread causes a dead lock or " +
320                     "sudden performance drop. Use addListener() instead or " +
321                     "call await*() from a different thread.");
322         }
323     }
324 
325     boolean setDone() {
326         synchronized (this) {
327             // Allow only once.
328             if (done) {
329                 return false;
330             }
331 
332             done = true;
333             if (waiters > 0) {
334                 notifyAll();
335             }
336         }
337 
338         notifyListeners();
339         return true;
340     }
341 
342     private void notifyListeners() {
343         // This method doesn't need synchronization because:
344         // 1) This method is always called after synchronized (this) block.
345         //    Hence any listener list modification happens-before this method.
346         // 2) This method is called only when 'done' is true.  Once 'done'
347         //    becomes true, the listener list is never modified - see add/removeListener()
348         if (firstListener != null) {
349             notifyListener(firstListener);
350             firstListener = null;
351 
352             if (otherListeners != null) {
353                 for (ChannelGroupFutureListener l: otherListeners) {
354                     notifyListener(l);
355                 }
356                 otherListeners = null;
357             }
358         }
359     }
360 
361     private void notifyListener(ChannelGroupFutureListener l) {
362         try {
363             l.operationComplete(this);
364         } catch (Throwable t) {
365             if (logger.isWarnEnabled()) {
366                 logger.warn(
367                         "An exception was thrown by " +
368                         ChannelFutureListener.class.getSimpleName() + '.', t);
369             }
370         }
371     }
372 }