View Javadoc
1   /*
2    * Copyright 2013 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.group;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufHolder;
20  import io.netty.channel.Channel;
21  import io.netty.channel.ChannelFuture;
22  import io.netty.channel.ChannelFutureListener;
23  import io.netty.channel.ChannelId;
24  import io.netty.channel.ServerChannel;
25  import io.netty.util.ReferenceCountUtil;
26  import io.netty.util.concurrent.EventExecutor;
27  import io.netty.util.internal.PlatformDependent;
28  import io.netty.util.internal.StringUtil;
29  
30  import java.util.AbstractSet;
31  import java.util.ArrayList;
32  import java.util.Collection;
33  import java.util.Iterator;
34  import java.util.LinkedHashMap;
35  import java.util.Map;
36  import java.util.concurrent.ConcurrentMap;
37  import java.util.concurrent.atomic.AtomicInteger;
38  
39  /**
40   * The default {@link ChannelGroup} implementation.
41   */
42  public class DefaultChannelGroup extends AbstractSet<Channel> implements ChannelGroup {
43  
44      private static final AtomicInteger nextId = new AtomicInteger();
45      private final String name;
46      private final EventExecutor executor;
47      private final ConcurrentMap<ChannelId, Channel> serverChannels = PlatformDependent.newConcurrentHashMap();
48      private final ConcurrentMap<ChannelId, Channel> nonServerChannels = PlatformDependent.newConcurrentHashMap();
49      private final ChannelFutureListener remover = new ChannelFutureListener() {
50          @Override
51          public void operationComplete(ChannelFuture future) throws Exception {
52              remove(future.channel());
53          }
54      };
55      private final VoidChannelGroupFuture voidFuture = new VoidChannelGroupFuture(this);
56      private final boolean stayClosed;
57      private volatile boolean closed;
58  
59      /**
60       * Creates a new group with a generated name and the provided {@link EventExecutor} to notify the
61       * {@link ChannelGroupFuture}s.
62       */
63      public DefaultChannelGroup(EventExecutor executor) {
64          this(executor, false);
65      }
66  
67      /**
68       * Creates a new group with the specified {@code name} and {@link EventExecutor} to notify the
69       * {@link ChannelGroupFuture}s.  Please note that different groups can have the same name, which means no
70       * duplicate check is done against group names.
71       */
72      public DefaultChannelGroup(String name, EventExecutor executor) {
73          this(name, executor, false);
74      }
75  
76      /**
77       * Creates a new group with a generated name and the provided {@link EventExecutor} to notify the
78       * {@link ChannelGroupFuture}s. {@code stayClosed} defines whether or not, this group can be closed
79       * more than once. Adding channels to a closed group will immediately close them, too. This makes it
80       * easy, to shutdown server and child channels at once.
81       */
82      public DefaultChannelGroup(EventExecutor executor, boolean stayClosed) {
83          this("group-0x" + Integer.toHexString(nextId.incrementAndGet()), executor, stayClosed);
84      }
85  
86      /**
87       * Creates a new group with the specified {@code name} and {@link EventExecutor} to notify the
88       * {@link ChannelGroupFuture}s. {@code stayClosed} defines whether or not, this group can be closed
89       * more than once. Adding channels to a closed group will immediately close them, too. This makes it
90       * easy, to shutdown server and child channels at once. Please note that different groups can have
91       * the same name, which means no duplicate check is done against group names.
92       */
93      public DefaultChannelGroup(String name, EventExecutor executor, boolean stayClosed) {
94          if (name == null) {
95              throw new NullPointerException("name");
96          }
97          this.name = name;
98          this.executor = executor;
99          this.stayClosed = stayClosed;
100     }
101 
102     @Override
103     public String name() {
104         return name;
105     }
106 
107     @Override
108     public Channel find(ChannelId id) {
109         Channel c = nonServerChannels.get(id);
110         if (c != null) {
111             return c;
112         } else {
113             return serverChannels.get(id);
114         }
115     }
116 
117     @Override
118     public boolean isEmpty() {
119         return nonServerChannels.isEmpty() && serverChannels.isEmpty();
120     }
121 
122     @Override
123     public int size() {
124         return nonServerChannels.size() + serverChannels.size();
125     }
126 
127     @Override
128     public boolean contains(Object o) {
129         if (o instanceof ServerChannel) {
130             return serverChannels.containsValue(o);
131         } else if (o instanceof Channel) {
132             return nonServerChannels.containsValue(o);
133         }
134         return false;
135     }
136 
137     @Override
138     public boolean add(Channel channel) {
139         ConcurrentMap<ChannelId, Channel> map =
140             channel instanceof ServerChannel? serverChannels : nonServerChannels;
141 
142         boolean added = map.putIfAbsent(channel.id(), channel) == null;
143         if (added) {
144             channel.closeFuture().addListener(remover);
145         }
146 
147         if (stayClosed && closed) {
148 
149             // First add channel, than check if closed.
150             // Seems inefficient at first, but this way a volatile
151             // gives us enough synchronization to be thread-safe.
152             //
153             // If true: Close right away.
154             // (Might be closed a second time by ChannelGroup.close(), but this is ok)
155             //
156             // If false: Channel will definitely be closed by the ChannelGroup.
157             // (Because closed=true always happens-before ChannelGroup.close())
158             //
159             // See https://github.com/netty/netty/issues/4020
160             channel.close();
161         }
162 
163         return added;
164     }
165 
166     @Override
167     public boolean remove(Object o) {
168         Channel c = null;
169         if (o instanceof ChannelId) {
170             c = nonServerChannels.remove(o);
171             if (c == null) {
172                 c = serverChannels.remove(o);
173             }
174         } else if (o instanceof Channel) {
175             c = (Channel) o;
176             if (c instanceof ServerChannel) {
177                 c = serverChannels.remove(c.id());
178             } else {
179                 c = nonServerChannels.remove(c.id());
180             }
181         }
182 
183         if (c == null) {
184             return false;
185         }
186 
187         c.closeFuture().removeListener(remover);
188         return true;
189     }
190 
191     @Override
192     public void clear() {
193         nonServerChannels.clear();
194         serverChannels.clear();
195     }
196 
197     @Override
198     public Iterator<Channel> iterator() {
199         return new CombinedIterator<Channel>(
200                 serverChannels.values().iterator(),
201                 nonServerChannels.values().iterator());
202     }
203 
204     @Override
205     public Object[] toArray() {
206         Collection<Channel> channels = new ArrayList<Channel>(size());
207         channels.addAll(serverChannels.values());
208         channels.addAll(nonServerChannels.values());
209         return channels.toArray();
210     }
211 
212     @Override
213     public <T> T[] toArray(T[] a) {
214         Collection<Channel> channels = new ArrayList<Channel>(size());
215         channels.addAll(serverChannels.values());
216         channels.addAll(nonServerChannels.values());
217         return channels.toArray(a);
218     }
219 
220     @Override
221     public ChannelGroupFuture close() {
222         return close(ChannelMatchers.all());
223     }
224 
225     @Override
226     public ChannelGroupFuture disconnect() {
227         return disconnect(ChannelMatchers.all());
228     }
229 
230     @Override
231     public ChannelGroupFuture deregister() {
232         return deregister(ChannelMatchers.all());
233     }
234 
235     @Override
236     public ChannelGroupFuture write(Object message) {
237         return write(message, ChannelMatchers.all());
238     }
239 
240     // Create a safe duplicate of the message to write it to a channel but not affect other writes.
241     // See https://github.com/netty/netty/issues/1461
242     private static Object safeDuplicate(Object message) {
243         if (message instanceof ByteBuf) {
244             return ((ByteBuf) message).retainedDuplicate();
245         } else if (message instanceof ByteBufHolder) {
246             return ((ByteBufHolder) message).retainedDuplicate();
247         } else {
248             return ReferenceCountUtil.retain(message);
249         }
250     }
251 
252     @Override
253     public ChannelGroupFuture write(Object message, ChannelMatcher matcher) {
254         return write(message, matcher, false);
255     }
256 
257     @Override
258     public ChannelGroupFuture write(Object message, ChannelMatcher matcher, boolean voidPromise) {
259         if (message == null) {
260             throw new NullPointerException("message");
261         }
262         if (matcher == null) {
263             throw new NullPointerException("matcher");
264         }
265 
266         final ChannelGroupFuture future;
267         if (voidPromise) {
268             for (Channel c: nonServerChannels.values()) {
269                 if (matcher.matches(c)) {
270                     c.write(safeDuplicate(message), c.voidPromise());
271                 }
272             }
273             future = voidFuture;
274         } else {
275             Map<Channel, ChannelFuture> futures = new LinkedHashMap<Channel, ChannelFuture>(size());
276             for (Channel c: nonServerChannels.values()) {
277                 if (matcher.matches(c)) {
278                     futures.put(c, c.write(safeDuplicate(message)));
279                 }
280             }
281             future = new DefaultChannelGroupFuture(this, futures, executor);
282         }
283         ReferenceCountUtil.release(message);
284         return future;
285     }
286 
287     @Override
288     public ChannelGroup flush() {
289         return flush(ChannelMatchers.all());
290     }
291 
292     @Override
293     public ChannelGroupFuture flushAndWrite(Object message) {
294         return writeAndFlush(message);
295     }
296 
297     @Override
298     public ChannelGroupFuture writeAndFlush(Object message) {
299         return writeAndFlush(message, ChannelMatchers.all());
300     }
301 
302     @Override
303     public ChannelGroupFuture disconnect(ChannelMatcher matcher) {
304         if (matcher == null) {
305             throw new NullPointerException("matcher");
306         }
307 
308         Map<Channel, ChannelFuture> futures =
309                 new LinkedHashMap<Channel, ChannelFuture>(size());
310 
311         for (Channel c: serverChannels.values()) {
312             if (matcher.matches(c)) {
313                 futures.put(c, c.disconnect());
314             }
315         }
316         for (Channel c: nonServerChannels.values()) {
317             if (matcher.matches(c)) {
318                 futures.put(c, c.disconnect());
319             }
320         }
321 
322         return new DefaultChannelGroupFuture(this, futures, executor);
323     }
324 
325     @Override
326     public ChannelGroupFuture close(ChannelMatcher matcher) {
327         if (matcher == null) {
328             throw new NullPointerException("matcher");
329         }
330 
331         Map<Channel, ChannelFuture> futures =
332                 new LinkedHashMap<Channel, ChannelFuture>(size());
333 
334         if (stayClosed) {
335             // It is important to set the closed to true, before closing channels.
336             // Our invariants are:
337             // closed=true happens-before ChannelGroup.close()
338             // ChannelGroup.add() happens-before checking closed==true
339             //
340             // See https://github.com/netty/netty/issues/4020
341             closed = true;
342         }
343 
344         for (Channel c: serverChannels.values()) {
345             if (matcher.matches(c)) {
346                 futures.put(c, c.close());
347             }
348         }
349         for (Channel c: nonServerChannels.values()) {
350             if (matcher.matches(c)) {
351                 futures.put(c, c.close());
352             }
353         }
354 
355         return new DefaultChannelGroupFuture(this, futures, executor);
356     }
357 
358     @Override
359     public ChannelGroupFuture deregister(ChannelMatcher matcher) {
360         if (matcher == null) {
361             throw new NullPointerException("matcher");
362         }
363 
364         Map<Channel, ChannelFuture> futures =
365                 new LinkedHashMap<Channel, ChannelFuture>(size());
366 
367         for (Channel c: serverChannels.values()) {
368             if (matcher.matches(c)) {
369                 futures.put(c, c.deregister());
370             }
371         }
372         for (Channel c: nonServerChannels.values()) {
373             if (matcher.matches(c)) {
374                 futures.put(c, c.deregister());
375             }
376         }
377 
378         return new DefaultChannelGroupFuture(this, futures, executor);
379     }
380 
381     @Override
382     public ChannelGroup flush(ChannelMatcher matcher) {
383         for (Channel c: nonServerChannels.values()) {
384             if (matcher.matches(c)) {
385                 c.flush();
386             }
387         }
388         return this;
389     }
390 
391     @Override
392     public ChannelGroupFuture flushAndWrite(Object message, ChannelMatcher matcher) {
393         return writeAndFlush(message, matcher);
394     }
395 
396     @Override
397     public ChannelGroupFuture writeAndFlush(Object message, ChannelMatcher matcher) {
398         return writeAndFlush(message, matcher, false);
399     }
400 
401     @Override
402     public ChannelGroupFuture writeAndFlush(Object message, ChannelMatcher matcher, boolean voidPromise) {
403         if (message == null) {
404             throw new NullPointerException("message");
405         }
406 
407         final ChannelGroupFuture future;
408         if (voidPromise) {
409             for (Channel c: nonServerChannels.values()) {
410                 if (matcher.matches(c)) {
411                     c.writeAndFlush(safeDuplicate(message), c.voidPromise());
412                 }
413             }
414             future = voidFuture;
415         } else {
416             Map<Channel, ChannelFuture> futures = new LinkedHashMap<Channel, ChannelFuture>(size());
417             for (Channel c: nonServerChannels.values()) {
418                 if (matcher.matches(c)) {
419                     futures.put(c, c.writeAndFlush(safeDuplicate(message)));
420                 }
421             }
422             future = new DefaultChannelGroupFuture(this, futures, executor);
423         }
424         ReferenceCountUtil.release(message);
425         return future;
426     }
427 
428     @Override
429     public ChannelGroupFuture newCloseFuture() {
430         return newCloseFuture(ChannelMatchers.all());
431     }
432 
433     @Override
434     public ChannelGroupFuture newCloseFuture(ChannelMatcher matcher) {
435         Map<Channel, ChannelFuture> futures =
436                 new LinkedHashMap<Channel, ChannelFuture>(size());
437 
438         for (Channel c: serverChannels.values()) {
439             if (matcher.matches(c)) {
440                 futures.put(c, c.closeFuture());
441             }
442         }
443         for (Channel c: nonServerChannels.values()) {
444             if (matcher.matches(c)) {
445                 futures.put(c, c.closeFuture());
446             }
447         }
448 
449         return new DefaultChannelGroupFuture(this, futures, executor);
450     }
451 
452     @Override
453     public int hashCode() {
454         return System.identityHashCode(this);
455     }
456 
457     @Override
458     public boolean equals(Object o) {
459         return this == o;
460     }
461 
462     @Override
463     public int compareTo(ChannelGroup o) {
464         int v = name().compareTo(o.name());
465         if (v != 0) {
466             return v;
467         }
468 
469         return System.identityHashCode(this) - System.identityHashCode(o);
470     }
471 
472     @Override
473     public String toString() {
474         return StringUtil.simpleClassName(this) + "(name: " + name() + ", size: " + size() + ')';
475     }
476 }