View Javadoc
1   /*
2    * Copyright 2013 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.group;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufHolder;
20  import io.netty.channel.Channel;
21  import io.netty.channel.ChannelFuture;
22  import io.netty.channel.ChannelFutureListener;
23  import io.netty.channel.ChannelId;
24  import io.netty.channel.ServerChannel;
25  import io.netty.util.ReferenceCountUtil;
26  import io.netty.util.concurrent.EventExecutor;
27  import io.netty.util.internal.PlatformDependent;
28  import io.netty.util.internal.StringUtil;
29  
30  import java.util.AbstractSet;
31  import java.util.ArrayList;
32  import java.util.Collection;
33  import java.util.Iterator;
34  import java.util.LinkedHashMap;
35  import java.util.Map;
36  import java.util.concurrent.ConcurrentMap;
37  import java.util.concurrent.atomic.AtomicInteger;
38  
39  /**
40   * The default {@link ChannelGroup} implementation.
41   */
42  public class DefaultChannelGroup extends AbstractSet<Channel> implements ChannelGroup {
43  
44      private static final AtomicInteger nextId = new AtomicInteger();
45      private final String name;
46      private final EventExecutor executor;
47      private final ConcurrentMap<ChannelId, Channel> serverChannels = PlatformDependent.newConcurrentHashMap();
48      private final ConcurrentMap<ChannelId, Channel> nonServerChannels = PlatformDependent.newConcurrentHashMap();
49      private final ChannelFutureListener remover = new ChannelFutureListener() {
50          @Override
51          public void operationComplete(ChannelFuture future) throws Exception {
52              remove(future.channel());
53          }
54      };
55      private final VoidChannelGroupFuture voidFuture = new VoidChannelGroupFuture(this);
56      private final boolean stayClosed;
57      private volatile boolean closed;
58  
59      /**
60       * Creates a new group with a generated name and the provided {@link EventExecutor} to notify the
61       * {@link ChannelGroupFuture}s.
62       */
63      public DefaultChannelGroup(EventExecutor executor) {
64          this(executor, false);
65      }
66  
67      /**
68       * Creates a new group with the specified {@code name} and {@link EventExecutor} to notify the
69       * {@link ChannelGroupFuture}s.  Please note that different groups can have the same name, which means no
70       * duplicate check is done against group names.
71       */
72      public DefaultChannelGroup(String name, EventExecutor executor) {
73          this(name, executor, false);
74      }
75  
76      /**
77       * Creates a new group with a generated name and the provided {@link EventExecutor} to notify the
78       * {@link ChannelGroupFuture}s. {@code stayClosed} defines whether or not, this group can be closed
79       * more than once. Adding channels to a closed group will immediately close them, too. This makes it
80       * easy, to shutdown server and child channels at once.
81       */
82      public DefaultChannelGroup(EventExecutor executor, boolean stayClosed) {
83          this("group-0x" + Integer.toHexString(nextId.incrementAndGet()), executor, stayClosed);
84      }
85  
86      /**
87       * Creates a new group with the specified {@code name} and {@link EventExecutor} to notify the
88       * {@link ChannelGroupFuture}s. {@code stayClosed} defines whether or not, this group can be closed
89       * more than once. Adding channels to a closed group will immediately close them, too. This makes it
90       * easy, to shutdown server and child channels at once. Please note that different groups can have
91       * the same name, which means no duplicate check is done against group names.
92       */
93      public DefaultChannelGroup(String name, EventExecutor executor, boolean stayClosed) {
94          if (name == null) {
95              throw new NullPointerException("name");
96          }
97          this.name = name;
98          this.executor = executor;
99          this.stayClosed = stayClosed;
100     }
101 
102     @Override
103     public String name() {
104         return name;
105     }
106 
107     @Override
108     public Channel find(ChannelId id) {
109         Channel c = nonServerChannels.get(id);
110         if (c != null) {
111             return c;
112         } else {
113             return serverChannels.get(id);
114         }
115     }
116 
117     @Override
118     public boolean isEmpty() {
119         return nonServerChannels.isEmpty() && serverChannels.isEmpty();
120     }
121 
122     @Override
123     public int size() {
124         return nonServerChannels.size() + serverChannels.size();
125     }
126 
127     @Override
128     public boolean contains(Object o) {
129         if (o instanceof Channel) {
130             Channel c = (Channel) o;
131             if (o instanceof ServerChannel) {
132                 return serverChannels.containsValue(c);
133             } else {
134                 return nonServerChannels.containsValue(c);
135             }
136         } else {
137             return false;
138         }
139     }
140 
141     @Override
142     public boolean add(Channel channel) {
143         ConcurrentMap<ChannelId, Channel> map =
144             channel instanceof ServerChannel? serverChannels : nonServerChannels;
145 
146         boolean added = map.putIfAbsent(channel.id(), channel) == null;
147         if (added) {
148             channel.closeFuture().addListener(remover);
149         }
150 
151         if (stayClosed && closed) {
152 
153             // First add channel, than check if closed.
154             // Seems inefficient at first, but this way a volatile
155             // gives us enough synchronization to be thread-safe.
156             //
157             // If true: Close right away.
158             // (Might be closed a second time by ChannelGroup.close(), but this is ok)
159             //
160             // If false: Channel will definitely be closed by the ChannelGroup.
161             // (Because closed=true always happens-before ChannelGroup.close())
162             //
163             // See https://github.com/netty/netty/issues/4020
164             channel.close();
165         }
166 
167         return added;
168     }
169 
170     @Override
171     public boolean remove(Object o) {
172         Channel c = null;
173         if (o instanceof ChannelId) {
174             c = nonServerChannels.remove(o);
175             if (c == null) {
176                 c = serverChannels.remove(o);
177             }
178         } else if (o instanceof Channel) {
179             c = (Channel) o;
180             if (c instanceof ServerChannel) {
181                 c = serverChannels.remove(c.id());
182             } else {
183                 c = nonServerChannels.remove(c.id());
184             }
185         }
186 
187         if (c == null) {
188             return false;
189         }
190 
191         c.closeFuture().removeListener(remover);
192         return true;
193     }
194 
195     @Override
196     public void clear() {
197         nonServerChannels.clear();
198         serverChannels.clear();
199     }
200 
201     @Override
202     public Iterator<Channel> iterator() {
203         return new CombinedIterator<Channel>(
204                 serverChannels.values().iterator(),
205                 nonServerChannels.values().iterator());
206     }
207 
208     @Override
209     public Object[] toArray() {
210         Collection<Channel> channels = new ArrayList<Channel>(size());
211         channels.addAll(serverChannels.values());
212         channels.addAll(nonServerChannels.values());
213         return channels.toArray();
214     }
215 
216     @Override
217     public <T> T[] toArray(T[] a) {
218         Collection<Channel> channels = new ArrayList<Channel>(size());
219         channels.addAll(serverChannels.values());
220         channels.addAll(nonServerChannels.values());
221         return channels.toArray(a);
222     }
223 
224     @Override
225     public ChannelGroupFuture close() {
226         return close(ChannelMatchers.all());
227     }
228 
229     @Override
230     public ChannelGroupFuture disconnect() {
231         return disconnect(ChannelMatchers.all());
232     }
233 
234     @Override
235     public ChannelGroupFuture deregister() {
236         return deregister(ChannelMatchers.all());
237     }
238 
239     @Override
240     public ChannelGroupFuture write(Object message) {
241         return write(message, ChannelMatchers.all());
242     }
243 
244     // Create a safe duplicate of the message to write it to a channel but not affect other writes.
245     // See https://github.com/netty/netty/issues/1461
246     private static Object safeDuplicate(Object message) {
247         if (message instanceof ByteBuf) {
248             return ((ByteBuf) message).retainedDuplicate();
249         } else if (message instanceof ByteBufHolder) {
250             return ((ByteBufHolder) message).retainedDuplicate();
251         } else {
252             return ReferenceCountUtil.retain(message);
253         }
254     }
255 
256     @Override
257     public ChannelGroupFuture write(Object message, ChannelMatcher matcher) {
258         return write(message, matcher, false);
259     }
260 
261     @Override
262     public ChannelGroupFuture write(Object message, ChannelMatcher matcher, boolean voidPromise) {
263         if (message == null) {
264             throw new NullPointerException("message");
265         }
266         if (matcher == null) {
267             throw new NullPointerException("matcher");
268         }
269 
270         final ChannelGroupFuture future;
271         if (voidPromise) {
272             for (Channel c: nonServerChannels.values()) {
273                 if (matcher.matches(c)) {
274                     c.write(safeDuplicate(message), c.voidPromise());
275                 }
276             }
277             future = voidFuture;
278         } else {
279             Map<Channel, ChannelFuture> futures = new LinkedHashMap<Channel, ChannelFuture>(size());
280             for (Channel c: nonServerChannels.values()) {
281                 if (matcher.matches(c)) {
282                     futures.put(c, c.write(safeDuplicate(message)));
283                 }
284             }
285             future = new DefaultChannelGroupFuture(this, futures, executor);
286         }
287         ReferenceCountUtil.release(message);
288         return future;
289     }
290 
291     @Override
292     public ChannelGroup flush() {
293         return flush(ChannelMatchers.all());
294     }
295 
296     @Override
297     public ChannelGroupFuture flushAndWrite(Object message) {
298         return writeAndFlush(message);
299     }
300 
301     @Override
302     public ChannelGroupFuture writeAndFlush(Object message) {
303         return writeAndFlush(message, ChannelMatchers.all());
304     }
305 
306     @Override
307     public ChannelGroupFuture disconnect(ChannelMatcher matcher) {
308         if (matcher == null) {
309             throw new NullPointerException("matcher");
310         }
311 
312         Map<Channel, ChannelFuture> futures =
313                 new LinkedHashMap<Channel, ChannelFuture>(size());
314 
315         for (Channel c: serverChannels.values()) {
316             if (matcher.matches(c)) {
317                 futures.put(c, c.disconnect());
318             }
319         }
320         for (Channel c: nonServerChannels.values()) {
321             if (matcher.matches(c)) {
322                 futures.put(c, c.disconnect());
323             }
324         }
325 
326         return new DefaultChannelGroupFuture(this, futures, executor);
327     }
328 
329     @Override
330     public ChannelGroupFuture close(ChannelMatcher matcher) {
331         if (matcher == null) {
332             throw new NullPointerException("matcher");
333         }
334 
335         Map<Channel, ChannelFuture> futures =
336                 new LinkedHashMap<Channel, ChannelFuture>(size());
337 
338         if (stayClosed) {
339             // It is important to set the closed to true, before closing channels.
340             // Our invariants are:
341             // closed=true happens-before ChannelGroup.close()
342             // ChannelGroup.add() happens-before checking closed==true
343             //
344             // See https://github.com/netty/netty/issues/4020
345             closed = true;
346         }
347 
348         for (Channel c: serverChannels.values()) {
349             if (matcher.matches(c)) {
350                 futures.put(c, c.close());
351             }
352         }
353         for (Channel c: nonServerChannels.values()) {
354             if (matcher.matches(c)) {
355                 futures.put(c, c.close());
356             }
357         }
358 
359         return new DefaultChannelGroupFuture(this, futures, executor);
360     }
361 
362     @Override
363     public ChannelGroupFuture deregister(ChannelMatcher matcher) {
364         if (matcher == null) {
365             throw new NullPointerException("matcher");
366         }
367 
368         Map<Channel, ChannelFuture> futures =
369                 new LinkedHashMap<Channel, ChannelFuture>(size());
370 
371         for (Channel c: serverChannels.values()) {
372             if (matcher.matches(c)) {
373                 futures.put(c, c.deregister());
374             }
375         }
376         for (Channel c: nonServerChannels.values()) {
377             if (matcher.matches(c)) {
378                 futures.put(c, c.deregister());
379             }
380         }
381 
382         return new DefaultChannelGroupFuture(this, futures, executor);
383     }
384 
385     @Override
386     public ChannelGroup flush(ChannelMatcher matcher) {
387         for (Channel c: nonServerChannels.values()) {
388             if (matcher.matches(c)) {
389                 c.flush();
390             }
391         }
392         return this;
393     }
394 
395     @Override
396     public ChannelGroupFuture flushAndWrite(Object message, ChannelMatcher matcher) {
397         return writeAndFlush(message, matcher);
398     }
399 
400     @Override
401     public ChannelGroupFuture writeAndFlush(Object message, ChannelMatcher matcher) {
402         return writeAndFlush(message, matcher, false);
403     }
404 
405     @Override
406     public ChannelGroupFuture writeAndFlush(Object message, ChannelMatcher matcher, boolean voidPromise) {
407         if (message == null) {
408             throw new NullPointerException("message");
409         }
410 
411         final ChannelGroupFuture future;
412         if (voidPromise) {
413             for (Channel c: nonServerChannels.values()) {
414                 if (matcher.matches(c)) {
415                     c.writeAndFlush(safeDuplicate(message), c.voidPromise());
416                 }
417             }
418             future = voidFuture;
419         } else {
420             Map<Channel, ChannelFuture> futures = new LinkedHashMap<Channel, ChannelFuture>(size());
421             for (Channel c: nonServerChannels.values()) {
422                 if (matcher.matches(c)) {
423                     futures.put(c, c.writeAndFlush(safeDuplicate(message)));
424                 }
425             }
426             future = new DefaultChannelGroupFuture(this, futures, executor);
427         }
428         ReferenceCountUtil.release(message);
429         return future;
430     }
431 
432     @Override
433     public ChannelGroupFuture newCloseFuture() {
434         return newCloseFuture(ChannelMatchers.all());
435     }
436 
437     @Override
438     public ChannelGroupFuture newCloseFuture(ChannelMatcher matcher) {
439         Map<Channel, ChannelFuture> futures =
440                 new LinkedHashMap<Channel, ChannelFuture>(size());
441 
442         for (Channel c: serverChannels.values()) {
443             if (matcher.matches(c)) {
444                 futures.put(c, c.closeFuture());
445             }
446         }
447         for (Channel c: nonServerChannels.values()) {
448             if (matcher.matches(c)) {
449                 futures.put(c, c.closeFuture());
450             }
451         }
452 
453         return new DefaultChannelGroupFuture(this, futures, executor);
454     }
455 
456     @Override
457     public int hashCode() {
458         return System.identityHashCode(this);
459     }
460 
461     @Override
462     public boolean equals(Object o) {
463         return this == o;
464     }
465 
466     @Override
467     public int compareTo(ChannelGroup o) {
468         int v = name().compareTo(o.name());
469         if (v != 0) {
470             return v;
471         }
472 
473         return System.identityHashCode(this) - System.identityHashCode(o);
474     }
475 
476     @Override
477     public String toString() {
478         return StringUtil.simpleClassName(this) + "(name: " + name() + ", size: " + size() + ')';
479     }
480 }