View Javadoc
1   /*
2    * Copyright 2013 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.group;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufHolder;
20  import io.netty.channel.Channel;
21  import io.netty.channel.ChannelFuture;
22  import io.netty.channel.ChannelFutureListener;
23  import io.netty.channel.ChannelId;
24  import io.netty.channel.ServerChannel;
25  import io.netty.util.ReferenceCountUtil;
26  import io.netty.util.concurrent.EventExecutor;
27  import io.netty.util.internal.PlatformDependent;
28  import io.netty.util.internal.StringUtil;
29  
30  import java.util.AbstractSet;
31  import java.util.ArrayList;
32  import java.util.Collection;
33  import java.util.Iterator;
34  import java.util.LinkedHashMap;
35  import java.util.Map;
36  import java.util.concurrent.ConcurrentMap;
37  import java.util.concurrent.atomic.AtomicInteger;
38  
39  /**
40   * The default {@link ChannelGroup} implementation.
41   */
42  public class DefaultChannelGroup extends AbstractSet<Channel> implements ChannelGroup {
43  
44      private static final AtomicInteger nextId = new AtomicInteger();
45      private final String name;
46      private final EventExecutor executor;
47      private final ConcurrentMap<ChannelId, Channel> serverChannels = PlatformDependent.newConcurrentHashMap();
48      private final ConcurrentMap<ChannelId, Channel> nonServerChannels = PlatformDependent.newConcurrentHashMap();
49      private final ChannelFutureListener remover = new ChannelFutureListener() {
50          @Override
51          public void operationComplete(ChannelFuture future) throws Exception {
52              remove(future.channel());
53          }
54      };
55  
56      /**
57       * Creates a new group with a generated name and the provided {@link EventExecutor} to notify the
58       * {@link ChannelGroupFuture}s.
59       */
60      public DefaultChannelGroup(EventExecutor executor) {
61          this("group-0x" + Integer.toHexString(nextId.incrementAndGet()), executor);
62      }
63  
64      /**
65       * Creates a new group with the specified {@code name} and {@link EventExecutor} to notify the
66       * {@link ChannelGroupFuture}s.  Please note that different groups can have the same name, which means no
67       * duplicate check is done against group names.
68       */
69      public DefaultChannelGroup(String name, EventExecutor executor) {
70          if (name == null) {
71              throw new NullPointerException("name");
72          }
73          this.name = name;
74          this.executor = executor;
75      }
76  
77      @Override
78      public String name() {
79          return name;
80      }
81  
82      @Override
83      public Channel find(ChannelId id) {
84          Channel c = nonServerChannels.get(id);
85          if (c != null) {
86              return c;
87          } else {
88              return serverChannels.get(id);
89          }
90      }
91  
92      @Override
93      public boolean isEmpty() {
94          return nonServerChannels.isEmpty() && serverChannels.isEmpty();
95      }
96  
97      @Override
98      public int size() {
99          return nonServerChannels.size() + serverChannels.size();
100     }
101 
102     @Override
103     public boolean contains(Object o) {
104         if (o instanceof Channel) {
105             Channel c = (Channel) o;
106             if (o instanceof ServerChannel) {
107                 return serverChannels.containsValue(c);
108             } else {
109                 return nonServerChannels.containsValue(c);
110             }
111         } else {
112             return false;
113         }
114     }
115 
116     @Override
117     public boolean add(Channel channel) {
118         ConcurrentMap<ChannelId, Channel> map =
119             channel instanceof ServerChannel? serverChannels : nonServerChannels;
120 
121         boolean added = map.putIfAbsent(channel.id(), channel) == null;
122         if (added) {
123             channel.closeFuture().addListener(remover);
124         }
125         return added;
126     }
127 
128     @Override
129     public boolean remove(Object o) {
130         Channel c = null;
131         if (o instanceof ChannelId) {
132             c = nonServerChannels.remove(o);
133             if (c == null) {
134                 c = serverChannels.remove(o);
135             }
136         } else if (o instanceof Channel) {
137             c = (Channel) o;
138             if (c instanceof ServerChannel) {
139                 c = serverChannels.remove(c.id());
140             } else {
141                 c = nonServerChannels.remove(c.id());
142             }
143         }
144 
145         if (c == null) {
146             return false;
147         }
148 
149         c.closeFuture().removeListener(remover);
150         return true;
151     }
152 
153     @Override
154     public void clear() {
155         nonServerChannels.clear();
156         serverChannels.clear();
157     }
158 
159     @Override
160     public Iterator<Channel> iterator() {
161         return new CombinedIterator<Channel>(
162                 serverChannels.values().iterator(),
163                 nonServerChannels.values().iterator());
164     }
165 
166     @Override
167     public Object[] toArray() {
168         Collection<Channel> channels = new ArrayList<Channel>(size());
169         channels.addAll(serverChannels.values());
170         channels.addAll(nonServerChannels.values());
171         return channels.toArray();
172     }
173 
174     @Override
175     public <T> T[] toArray(T[] a) {
176         Collection<Channel> channels = new ArrayList<Channel>(size());
177         channels.addAll(serverChannels.values());
178         channels.addAll(nonServerChannels.values());
179         return channels.toArray(a);
180     }
181 
182     @Override
183     public ChannelGroupFuture close() {
184         return close(ChannelMatchers.all());
185     }
186 
187     @Override
188     public ChannelGroupFuture disconnect() {
189         return disconnect(ChannelMatchers.all());
190     }
191 
192     @Override
193     public ChannelGroupFuture deregister() {
194         return deregister(ChannelMatchers.all());
195     }
196 
197     @Override
198     public ChannelGroupFuture write(Object message) {
199         return write(message, ChannelMatchers.all());
200     }
201 
202     // Create a safe duplicate of the message to write it to a channel but not affect other writes.
203     // See https://github.com/netty/netty/issues/1461
204     private static Object safeDuplicate(Object message) {
205         if (message instanceof ByteBuf) {
206             return ((ByteBuf) message).duplicate().retain();
207         } else if (message instanceof ByteBufHolder) {
208             return ((ByteBufHolder) message).duplicate().retain();
209         } else {
210             return ReferenceCountUtil.retain(message);
211         }
212     }
213 
214     @Override
215     public ChannelGroupFuture write(Object message, ChannelMatcher matcher) {
216         if (message == null) {
217             throw new NullPointerException("message");
218         }
219         if (matcher == null) {
220             throw new NullPointerException("matcher");
221         }
222 
223         Map<Channel, ChannelFuture> futures = new LinkedHashMap<Channel, ChannelFuture>(size());
224         for (Channel c: nonServerChannels.values()) {
225             if (matcher.matches(c)) {
226                 futures.put(c, c.write(safeDuplicate(message)));
227             }
228         }
229 
230         ReferenceCountUtil.release(message);
231         return new DefaultChannelGroupFuture(this, futures, executor);
232     }
233 
234     @Override
235     public ChannelGroup flush() {
236         return flush(ChannelMatchers.all());
237     }
238 
239     @Override
240     public ChannelGroupFuture writeAndFlush(Object message) {
241         return writeAndFlush(message, ChannelMatchers.all());
242     }
243 
244     @Override
245     public ChannelGroupFuture disconnect(ChannelMatcher matcher) {
246         if (matcher == null) {
247             throw new NullPointerException("matcher");
248         }
249 
250         Map<Channel, ChannelFuture> futures =
251                 new LinkedHashMap<Channel, ChannelFuture>(size());
252 
253         for (Channel c: serverChannels.values()) {
254             if (matcher.matches(c)) {
255                 futures.put(c, c.disconnect());
256             }
257         }
258         for (Channel c: nonServerChannels.values()) {
259             if (matcher.matches(c)) {
260                 futures.put(c, c.disconnect());
261             }
262         }
263 
264         return new DefaultChannelGroupFuture(this, futures, executor);
265     }
266 
267     @Override
268     public ChannelGroupFuture close(ChannelMatcher matcher) {
269         if (matcher == null) {
270             throw new NullPointerException("matcher");
271         }
272 
273         Map<Channel, ChannelFuture> futures =
274                 new LinkedHashMap<Channel, ChannelFuture>(size());
275 
276         for (Channel c: serverChannels.values()) {
277             if (matcher.matches(c)) {
278                 futures.put(c, c.close());
279             }
280         }
281         for (Channel c: nonServerChannels.values()) {
282             if (matcher.matches(c)) {
283                 futures.put(c, c.close());
284             }
285         }
286 
287         return new DefaultChannelGroupFuture(this, futures, executor);
288     }
289 
290     @Override
291     public ChannelGroupFuture deregister(ChannelMatcher matcher) {
292         if (matcher == null) {
293             throw new NullPointerException("matcher");
294         }
295 
296         Map<Channel, ChannelFuture> futures =
297                 new LinkedHashMap<Channel, ChannelFuture>(size());
298 
299         for (Channel c: serverChannels.values()) {
300             if (matcher.matches(c)) {
301                 futures.put(c, c.deregister());
302             }
303         }
304         for (Channel c: nonServerChannels.values()) {
305             if (matcher.matches(c)) {
306                 futures.put(c, c.deregister());
307             }
308         }
309 
310         return new DefaultChannelGroupFuture(this, futures, executor);
311     }
312 
313     @Override
314     public ChannelGroup flush(ChannelMatcher matcher) {
315         for (Channel c: nonServerChannels.values()) {
316             if (matcher.matches(c)) {
317                 c.flush();
318             }
319         }
320         return this;
321     }
322 
323     @Override
324     public ChannelGroupFuture writeAndFlush(Object message, ChannelMatcher matcher) {
325         if (message == null) {
326             throw new NullPointerException("message");
327         }
328 
329         Map<Channel, ChannelFuture> futures = new LinkedHashMap<Channel, ChannelFuture>(size());
330 
331         for (Channel c: nonServerChannels.values()) {
332             if (matcher.matches(c)) {
333                 futures.put(c, c.writeAndFlush(safeDuplicate(message)));
334             }
335         }
336 
337         ReferenceCountUtil.release(message);
338 
339         return new DefaultChannelGroupFuture(this, futures, executor);
340     }
341 
342     @Override
343     public int hashCode() {
344         return System.identityHashCode(this);
345     }
346 
347     @Override
348     public boolean equals(Object o) {
349         return this == o;
350     }
351 
352     @Override
353     public int compareTo(ChannelGroup o) {
354         int v = name().compareTo(o.name());
355         if (v != 0) {
356             return v;
357         }
358 
359         return System.identityHashCode(this) - System.identityHashCode(o);
360     }
361 
362     @Override
363     public String toString() {
364         return StringUtil.simpleClassName(this) + "(name: " + name() + ", size: " + size() + ')';
365     }
366 }