View Javadoc
1   /*
2    * Copyright 2013 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.group;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufHolder;
20  import io.netty.channel.Channel;
21  import io.netty.channel.ChannelFuture;
22  import io.netty.channel.ChannelFutureListener;
23  import io.netty.channel.ServerChannel;
24  import io.netty.util.ReferenceCountUtil;
25  import io.netty.util.concurrent.EventExecutor;
26  import io.netty.util.internal.ConcurrentSet;
27  import io.netty.util.internal.StringUtil;
28  
29  import java.util.AbstractSet;
30  import java.util.ArrayList;
31  import java.util.Collection;
32  import java.util.Iterator;
33  import java.util.LinkedHashMap;
34  import java.util.Map;
35  import java.util.concurrent.atomic.AtomicInteger;
36  
37  /**
38   * The default {@link ChannelGroup} implementation.
39   */
40  public class DefaultChannelGroup extends AbstractSet<Channel> implements ChannelGroup {
41  
42      private static final AtomicInteger nextId = new AtomicInteger();
43      private final String name;
44      private final EventExecutor executor;
45      private final ConcurrentSet<Channel> serverChannels = new ConcurrentSet<Channel>();
46      private final ConcurrentSet<Channel> nonServerChannels = new ConcurrentSet<Channel>();
47      private final ChannelFutureListener remover = new ChannelFutureListener() {
48          @Override
49          public void operationComplete(ChannelFuture future) throws Exception {
50              remove(future.channel());
51          }
52      };
53      private final boolean stayClosed;
54      private volatile boolean closed;
55  
56      /**
57       * Creates a new group with a generated name and the provided {@link EventExecutor} to notify the
58       * {@link ChannelGroupFuture}s.
59       */
60      public DefaultChannelGroup(EventExecutor executor) {
61          this(executor, false);
62      }
63  
64      /**
65       * Creates a new group with the specified {@code name} and {@link EventExecutor} to notify the
66       * {@link ChannelGroupFuture}s.  Please note that different groups can have the same name, which means no
67       * duplicate check is done against group names.
68       */
69      public DefaultChannelGroup(String name, EventExecutor executor) {
70          this(name, executor, false);
71      }
72  
73      /**
74       * Creates a new group with a generated name and the provided {@link EventExecutor} to notify the
75       * {@link ChannelGroupFuture}s. {@code stayClosed} defines whether or not, this group can be closed
76       * more than once. Adding channels to a closed group will immediately close them, too. This makes it
77       * easy, to shutdown server and child channels at once.
78       */
79      public DefaultChannelGroup(EventExecutor executor, boolean stayClosed) {
80          this("group-0x" + Integer.toHexString(nextId.incrementAndGet()), executor, stayClosed);
81      }
82  
83      /**
84       * Creates a new group with the specified {@code name} and {@link EventExecutor} to notify the
85       * {@link ChannelGroupFuture}s. {@code stayClosed} defines whether or not, this group can be closed
86       * more than once. Adding channels to a closed group will immediately close them, too. This makes it
87       * easy, to shutdown server and child channels at once. Please note that different groups can have
88       * the same name, which means no duplicate check is done against group names.
89       */
90      public DefaultChannelGroup(String name, EventExecutor executor, boolean stayClosed) {
91          if (name == null) {
92              throw new NullPointerException("name");
93          }
94          this.name = name;
95          this.executor = executor;
96          this.stayClosed = stayClosed;
97      }
98  
99      @Override
100     public String name() {
101         return name;
102     }
103 
104     @Override
105     public boolean isEmpty() {
106         return nonServerChannels.isEmpty() && serverChannels.isEmpty();
107     }
108 
109     @Override
110     public int size() {
111         return nonServerChannels.size() + serverChannels.size();
112     }
113 
114     @Override
115     public boolean contains(Object o) {
116         if (o instanceof Channel) {
117             Channel c = (Channel) o;
118             if (o instanceof ServerChannel) {
119                 return serverChannels.contains(c);
120             } else {
121                 return nonServerChannels.contains(c);
122             }
123         } else {
124             return false;
125         }
126     }
127 
128     @Override
129     public boolean add(Channel channel) {
130         ConcurrentSet<Channel> set =
131             channel instanceof ServerChannel? serverChannels : nonServerChannels;
132 
133         boolean added = set.add(channel);
134         if (added) {
135             channel.closeFuture().addListener(remover);
136         }
137 
138         if (stayClosed && closed) {
139 
140             // First add channel, than check if closed.
141             // Seems inefficient at first, but this way a volatile
142             // gives us enough synchronization to be thread-safe.
143             //
144             // If true: Close right away.
145             // (Might be closed a second time by ChannelGroup.close(), but this is ok)
146             //
147             // If false: Channel will definitely be closed by the ChannelGroup.
148             // (Because closed=true always happens-before ChannelGroup.close())
149             //
150             // See https://github.com/netty/netty/issues/4020
151             channel.close();
152         }
153 
154         return added;
155     }
156 
157     @Override
158     public boolean remove(Object o) {
159         if (!(o instanceof Channel)) {
160             return false;
161         }
162         boolean removed;
163         Channel c = (Channel) o;
164         if (c instanceof ServerChannel) {
165             removed = serverChannels.remove(c);
166         } else {
167             removed = nonServerChannels.remove(c);
168         }
169         if (!removed) {
170             return false;
171         }
172 
173         c.closeFuture().removeListener(remover);
174         return true;
175     }
176 
177     @Override
178     public void clear() {
179         nonServerChannels.clear();
180         serverChannels.clear();
181     }
182 
183     @Override
184     public Iterator<Channel> iterator() {
185         return new CombinedIterator<Channel>(
186                 serverChannels.iterator(),
187                 nonServerChannels.iterator());
188     }
189 
190     @Override
191     public Object[] toArray() {
192         Collection<Channel> channels = new ArrayList<Channel>(size());
193         channels.addAll(serverChannels);
194         channels.addAll(nonServerChannels);
195         return channels.toArray();
196     }
197 
198     @Override
199     public <T> T[] toArray(T[] a) {
200         Collection<Channel> channels = new ArrayList<Channel>(size());
201         channels.addAll(serverChannels);
202         channels.addAll(nonServerChannels);
203         return channels.toArray(a);
204     }
205 
206     @Override
207     public ChannelGroupFuture close() {
208         return close(ChannelMatchers.all());
209     }
210 
211     @Override
212     public ChannelGroupFuture disconnect() {
213         return disconnect(ChannelMatchers.all());
214     }
215 
216     @Override
217     public ChannelGroupFuture deregister() {
218         return deregister(ChannelMatchers.all());
219     }
220 
221     @Override
222     public ChannelGroupFuture write(Object message) {
223         return write(message, ChannelMatchers.all());
224     }
225 
226     // Create a safe duplicate of the message to write it to a channel but not affect other writes.
227     // See https://github.com/netty/netty/issues/1461
228     private static Object safeDuplicate(Object message) {
229         if (message instanceof ByteBuf) {
230             return ((ByteBuf) message).duplicate().retain();
231         } else if (message instanceof ByteBufHolder) {
232             return ((ByteBufHolder) message).duplicate().retain();
233         } else {
234             return ReferenceCountUtil.retain(message);
235         }
236     }
237 
238     @Override
239     public ChannelGroupFuture write(Object message, ChannelMatcher matcher) {
240         if (message == null) {
241             throw new NullPointerException("message");
242         }
243         if (matcher == null) {
244             throw new NullPointerException("matcher");
245         }
246 
247         Map<Channel, ChannelFuture> futures = new LinkedHashMap<Channel, ChannelFuture>(size());
248         for (Channel c: nonServerChannels) {
249             if (matcher.matches(c)) {
250                 futures.put(c, c.write(safeDuplicate(message)));
251             }
252         }
253 
254         ReferenceCountUtil.release(message);
255         return new DefaultChannelGroupFuture(this, futures, executor);
256     }
257 
258     @Override
259     public ChannelGroup flush() {
260         return flush(ChannelMatchers.all());
261     }
262 
263     @Override
264     public ChannelGroupFuture flushAndWrite(Object message) {
265         return writeAndFlush(message);
266     }
267 
268     @Override
269     public ChannelGroupFuture writeAndFlush(Object message) {
270         return writeAndFlush(message, ChannelMatchers.all());
271     }
272 
273     @Override
274     public ChannelGroupFuture disconnect(ChannelMatcher matcher) {
275         if (matcher == null) {
276             throw new NullPointerException("matcher");
277         }
278 
279         Map<Channel, ChannelFuture> futures =
280                 new LinkedHashMap<Channel, ChannelFuture>(size());
281 
282         for (Channel c: serverChannels) {
283             if (matcher.matches(c)) {
284                 futures.put(c, c.disconnect());
285             }
286         }
287         for (Channel c: nonServerChannels) {
288             if (matcher.matches(c)) {
289                 futures.put(c, c.disconnect());
290             }
291         }
292 
293         return new DefaultChannelGroupFuture(this, futures, executor);
294     }
295 
296     @Override
297     public ChannelGroupFuture close(ChannelMatcher matcher) {
298         if (matcher == null) {
299             throw new NullPointerException("matcher");
300         }
301 
302         Map<Channel, ChannelFuture> futures =
303                 new LinkedHashMap<Channel, ChannelFuture>(size());
304 
305         if (stayClosed) {
306             // It is important to set the closed to true, before closing channels.
307             // Our invariants are:
308             // closed=true happens-before ChannelGroup.close()
309             // ChannelGroup.add() happens-before checking closed==true
310             //
311             // See https://github.com/netty/netty/issues/4020
312             closed = true;
313         }
314 
315         for (Channel c: serverChannels) {
316             if (matcher.matches(c)) {
317                 futures.put(c, c.close());
318             }
319         }
320         for (Channel c: nonServerChannels) {
321             if (matcher.matches(c)) {
322                 futures.put(c, c.close());
323             }
324         }
325 
326         return new DefaultChannelGroupFuture(this, futures, executor);
327     }
328 
329     @Override
330     public ChannelGroupFuture deregister(ChannelMatcher matcher) {
331         if (matcher == null) {
332             throw new NullPointerException("matcher");
333         }
334 
335         Map<Channel, ChannelFuture> futures =
336                 new LinkedHashMap<Channel, ChannelFuture>(size());
337 
338         for (Channel c: serverChannels) {
339             if (matcher.matches(c)) {
340                 futures.put(c, c.deregister());
341             }
342         }
343         for (Channel c: nonServerChannels) {
344             if (matcher.matches(c)) {
345                 futures.put(c, c.deregister());
346             }
347         }
348 
349         return new DefaultChannelGroupFuture(this, futures, executor);
350     }
351 
352     @Override
353     public ChannelGroup flush(ChannelMatcher matcher) {
354         for (Channel c: nonServerChannels) {
355             if (matcher.matches(c)) {
356                 c.flush();
357             }
358         }
359         return this;
360     }
361 
362     @Override
363     public ChannelGroupFuture flushAndWrite(Object message, ChannelMatcher matcher) {
364         return writeAndFlush(message, matcher);
365     }
366 
367     @Override
368     public ChannelGroupFuture writeAndFlush(Object message, ChannelMatcher matcher) {
369         if (message == null) {
370             throw new NullPointerException("message");
371         }
372 
373         Map<Channel, ChannelFuture> futures = new LinkedHashMap<Channel, ChannelFuture>(size());
374 
375         for (Channel c: nonServerChannels) {
376             if (matcher.matches(c)) {
377                 futures.put(c, c.writeAndFlush(safeDuplicate(message)));
378             }
379         }
380 
381         ReferenceCountUtil.release(message);
382 
383         return new DefaultChannelGroupFuture(this, futures, executor);
384     }
385 
386     /**
387      * Returns the {@link ChannelGroupFuture} which will be notified when all {@link Channel}s that are part of this
388      * {@link ChannelGroup}, at the time of calling, are closed.
389      */
390     public ChannelGroupFuture newCloseFuture() {
391         return newCloseFuture(ChannelMatchers.all());
392     }
393 
394     /**
395      * Returns the {@link ChannelGroupFuture} which will be notified when all {@link Channel}s that are part of this
396      * {@link ChannelGroup}, at the time of calling, are closed.
397      */
398     public ChannelGroupFuture newCloseFuture(ChannelMatcher matcher) {
399         Map<Channel, ChannelFuture> futures =
400                 new LinkedHashMap<Channel, ChannelFuture>(size());
401 
402         for (Channel c: serverChannels) {
403             if (matcher.matches(c)) {
404                 futures.put(c, c.closeFuture());
405             }
406         }
407         for (Channel c: nonServerChannels) {
408             if (matcher.matches(c)) {
409                 futures.put(c, c.closeFuture());
410             }
411         }
412 
413         return new DefaultChannelGroupFuture(this, futures, executor);
414     }
415 
416     @Override
417     public int hashCode() {
418         return System.identityHashCode(this);
419     }
420 
421     @Override
422     public boolean equals(Object o) {
423         return this == o;
424     }
425 
426     @Override
427     public int compareTo(ChannelGroup o) {
428         int v = name().compareTo(o.name());
429         if (v != 0) {
430             return v;
431         }
432 
433         return System.identityHashCode(this) - System.identityHashCode(o);
434     }
435 
436     @Override
437     public String toString() {
438         return StringUtil.simpleClassName(this) + "(name: " + name() + ", size: " + size() + ')';
439     }
440 }