View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.group;
17  
18  import io.netty.channel.Channel;
19  import io.netty.channel.ChannelFuture;
20  import io.netty.channel.ChannelFutureListener;
21  import io.netty.util.concurrent.BlockingOperationException;
22  import io.netty.util.concurrent.DefaultPromise;
23  import io.netty.util.concurrent.EventExecutor;
24  import io.netty.util.concurrent.Future;
25  import io.netty.util.concurrent.GenericFutureListener;
26  import io.netty.util.concurrent.ImmediateEventExecutor;
27  
28  import java.util.ArrayList;
29  import java.util.Collection;
30  import java.util.Collections;
31  import java.util.Iterator;
32  import java.util.LinkedHashMap;
33  import java.util.List;
34  import java.util.Map;
35  
36  
37  /**
38   * The default {@link ChannelGroupFuture} implementation.
39   */
40  final class DefaultChannelGroupFuture extends DefaultPromise<Void> implements ChannelGroupFuture {
41  
42      private final ChannelGroup group;
43      private final Map<Channel, ChannelFuture> futures;
44      private int successCount;
45      private int failureCount;
46  
47      private final ChannelFutureListener childListener = new ChannelFutureListener() {
48          @Override
49          public void operationComplete(ChannelFuture future) throws Exception {
50              boolean success = future.isSuccess();
51              boolean callSetDone;
52              synchronized (DefaultChannelGroupFuture.this) {
53                  if (success) {
54                      successCount ++;
55                  } else {
56                      failureCount ++;
57                  }
58  
59                  callSetDone = successCount + failureCount == futures.size();
60                  assert successCount + failureCount <= futures.size();
61              }
62  
63              if (callSetDone) {
64                  if (failureCount > 0) {
65                      List<Map.Entry<Channel, Throwable>> failed =
66                              new ArrayList<Map.Entry<Channel, Throwable>>(failureCount);
67                      for (ChannelFuture f: futures.values()) {
68                          if (!f.isSuccess()) {
69                              failed.add(new DefaultEntry<Channel, Throwable>(f.channel(), f.cause()));
70                          }
71                      }
72                      setFailure0(new ChannelGroupException(failed));
73                  } else {
74                      setSuccess0();
75                  }
76              }
77          }
78      };
79  
80      /**
81       * Creates a new instance.
82       */
83      DefaultChannelGroupFuture(ChannelGroup group, Collection<ChannelFuture> futures,  EventExecutor executor) {
84          super(executor);
85          if (group == null) {
86              throw new NullPointerException("group");
87          }
88          if (futures == null) {
89              throw new NullPointerException("futures");
90          }
91  
92          this.group = group;
93  
94          Map<Channel, ChannelFuture> futureMap = new LinkedHashMap<Channel, ChannelFuture>();
95          for (ChannelFuture f: futures) {
96              futureMap.put(f.channel(), f);
97          }
98  
99          this.futures = Collections.unmodifiableMap(futureMap);
100 
101         for (ChannelFuture f: this.futures.values()) {
102             f.addListener(childListener);
103         }
104 
105         // Done on arrival?
106         if (this.futures.isEmpty()) {
107             setSuccess0();
108         }
109     }
110 
111     DefaultChannelGroupFuture(ChannelGroup group, Map<Channel, ChannelFuture> futures, EventExecutor executor) {
112         super(executor);
113         this.group = group;
114         this.futures = Collections.unmodifiableMap(futures);
115         for (ChannelFuture f: this.futures.values()) {
116             f.addListener(childListener);
117         }
118 
119         // Done on arrival?
120         if (this.futures.isEmpty()) {
121             setSuccess0();
122         }
123     }
124 
125     @Override
126     public ChannelGroup group() {
127         return group;
128     }
129 
130     @Override
131     public ChannelFuture find(Channel channel) {
132         return futures.get(channel);
133     }
134 
135     @Override
136     public Iterator<ChannelFuture> iterator() {
137         return futures.values().iterator();
138     }
139 
140     @Override
141     public synchronized boolean isPartialSuccess() {
142         return successCount != 0 && successCount != futures.size();
143     }
144 
145     @Override
146     public synchronized boolean isPartialFailure() {
147         return failureCount != 0 && failureCount != futures.size();
148     }
149 
150     @Override
151     public DefaultChannelGroupFuture addListener(GenericFutureListener<? extends Future<? super Void>> listener) {
152         super.addListener(listener);
153         return this;
154     }
155 
156     @Override
157     public DefaultChannelGroupFuture addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners) {
158         super.addListeners(listeners);
159         return this;
160     }
161 
162     @Override
163     public DefaultChannelGroupFuture removeListener(GenericFutureListener<? extends Future<? super Void>> listener) {
164         super.removeListener(listener);
165         return this;
166     }
167 
168     @Override
169     public DefaultChannelGroupFuture removeListeners(
170             GenericFutureListener<? extends Future<? super Void>>... listeners) {
171         super.removeListeners(listeners);
172         return this;
173     }
174 
175     @Override
176     public DefaultChannelGroupFuture await() throws InterruptedException {
177         super.await();
178         return this;
179     }
180 
181     @Override
182     public DefaultChannelGroupFuture awaitUninterruptibly() {
183         super.awaitUninterruptibly();
184         return this;
185     }
186 
187     @Override
188     public DefaultChannelGroupFuture syncUninterruptibly() {
189         super.syncUninterruptibly();
190         return this;
191     }
192 
193     @Override
194     public DefaultChannelGroupFuture sync() throws InterruptedException {
195         super.sync();
196         return this;
197     }
198 
199     @Override
200     public ChannelGroupException cause() {
201         return (ChannelGroupException) super.cause();
202     }
203 
204     private void setSuccess0() {
205         super.setSuccess(null);
206     }
207 
208     private void setFailure0(ChannelGroupException cause) {
209         super.setFailure(cause);
210     }
211 
212     @Override
213     public DefaultChannelGroupFuture setSuccess(Void result) {
214         throw new IllegalStateException();
215     }
216 
217     @Override
218     public boolean trySuccess(Void result) {
219         throw new IllegalStateException();
220     }
221 
222     @Override
223     public DefaultChannelGroupFuture setFailure(Throwable cause) {
224         throw new IllegalStateException();
225     }
226 
227     @Override
228     public boolean tryFailure(Throwable cause) {
229         throw new IllegalStateException();
230     }
231 
232     @Override
233     protected void checkDeadLock() {
234         EventExecutor e = executor();
235         if (e != null && e != ImmediateEventExecutor.INSTANCE && e.inEventLoop()) {
236             throw new BlockingOperationException();
237         }
238     }
239 
240     private static final class DefaultEntry<K, V> implements Map.Entry<K, V> {
241         private final K key;
242         private final V value;
243 
244         DefaultEntry(K key, V value) {
245             this.key = key;
246             this.value = value;
247         }
248 
249         @Override
250         public K getKey() {
251             return key;
252         }
253 
254         @Override
255         public V getValue() {
256             return value;
257         }
258 
259         @Override
260         public V setValue(V value) {
261             throw new UnsupportedOperationException("read-only");
262         }
263     }
264 }