View Javadoc
1   /*
2    * Copyright 2013 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.group;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufHolder;
20  import io.netty.channel.Channel;
21  import io.netty.channel.ChannelFuture;
22  import io.netty.channel.ChannelFutureListener;
23  import io.netty.channel.ChannelId;
24  import io.netty.channel.ServerChannel;
25  import io.netty.util.ReferenceCountUtil;
26  import io.netty.util.concurrent.EventExecutor;
27  import io.netty.util.internal.ObjectUtil;
28  import io.netty.util.internal.PlatformDependent;
29  import io.netty.util.internal.StringUtil;
30  
31  import java.util.AbstractSet;
32  import java.util.ArrayList;
33  import java.util.Collection;
34  import java.util.Iterator;
35  import java.util.LinkedHashMap;
36  import java.util.Map;
37  import java.util.concurrent.ConcurrentMap;
38  import java.util.concurrent.atomic.AtomicInteger;
39  
40  /**
41   * The default {@link ChannelGroup} implementation.
42   */
43  public class DefaultChannelGroup extends AbstractSet<Channel> implements ChannelGroup {
44  
45      private static final AtomicInteger nextId = new AtomicInteger();
46      private final String name;
47      private final EventExecutor executor;
48      private final ConcurrentMap<ChannelId, Channel> serverChannels = PlatformDependent.newConcurrentHashMap();
49      private final ConcurrentMap<ChannelId, Channel> nonServerChannels = PlatformDependent.newConcurrentHashMap();
50      private final ChannelFutureListener remover = new ChannelFutureListener() {
51          @Override
52          public void operationComplete(ChannelFuture future) throws Exception {
53              remove(future.channel());
54          }
55      };
56      private final VoidChannelGroupFuture voidFuture = new VoidChannelGroupFuture(this);
57      private final boolean stayClosed;
58      private volatile boolean closed;
59  
60      /**
61       * Creates a new group with a generated name and the provided {@link EventExecutor} to notify the
62       * {@link ChannelGroupFuture}s.
63       */
64      public DefaultChannelGroup(EventExecutor executor) {
65          this(executor, false);
66      }
67  
68      /**
69       * Creates a new group with the specified {@code name} and {@link EventExecutor} to notify the
70       * {@link ChannelGroupFuture}s.  Please note that different groups can have the same name, which means no
71       * duplicate check is done against group names.
72       */
73      public DefaultChannelGroup(String name, EventExecutor executor) {
74          this(name, executor, false);
75      }
76  
77      /**
78       * Creates a new group with a generated name and the provided {@link EventExecutor} to notify the
79       * {@link ChannelGroupFuture}s. {@code stayClosed} defines whether or not, this group can be closed
80       * more than once. Adding channels to a closed group will immediately close them, too. This makes it
81       * easy, to shutdown server and child channels at once.
82       */
83      public DefaultChannelGroup(EventExecutor executor, boolean stayClosed) {
84          this("group-0x" + Integer.toHexString(nextId.incrementAndGet()), executor, stayClosed);
85      }
86  
87      /**
88       * Creates a new group with the specified {@code name} and {@link EventExecutor} to notify the
89       * {@link ChannelGroupFuture}s. {@code stayClosed} defines whether or not, this group can be closed
90       * more than once. Adding channels to a closed group will immediately close them, too. This makes it
91       * easy, to shutdown server and child channels at once. Please note that different groups can have
92       * the same name, which means no duplicate check is done against group names.
93       */
94      public DefaultChannelGroup(String name, EventExecutor executor, boolean stayClosed) {
95          ObjectUtil.checkNotNull(name, "name");
96          this.name = name;
97          this.executor = executor;
98          this.stayClosed = stayClosed;
99      }
100 
101     @Override
102     public String name() {
103         return name;
104     }
105 
106     @Override
107     public Channel find(ChannelId id) {
108         Channel c = nonServerChannels.get(id);
109         if (c != null) {
110             return c;
111         } else {
112             return serverChannels.get(id);
113         }
114     }
115 
116     @Override
117     public boolean isEmpty() {
118         return nonServerChannels.isEmpty() && serverChannels.isEmpty();
119     }
120 
121     @Override
122     public int size() {
123         return nonServerChannels.size() + serverChannels.size();
124     }
125 
126     @Override
127     public boolean contains(Object o) {
128         if (o instanceof ServerChannel) {
129             return serverChannels.containsValue(o);
130         } else if (o instanceof Channel) {
131             return nonServerChannels.containsValue(o);
132         }
133         return false;
134     }
135 
136     @Override
137     public boolean add(Channel channel) {
138         ConcurrentMap<ChannelId, Channel> map =
139             channel instanceof ServerChannel? serverChannels : nonServerChannels;
140 
141         boolean added = map.putIfAbsent(channel.id(), channel) == null;
142         if (added) {
143             channel.closeFuture().addListener(remover);
144         }
145 
146         if (stayClosed && closed) {
147 
148             // First add channel, than check if closed.
149             // Seems inefficient at first, but this way a volatile
150             // gives us enough synchronization to be thread-safe.
151             //
152             // If true: Close right away.
153             // (Might be closed a second time by ChannelGroup.close(), but this is ok)
154             //
155             // If false: Channel will definitely be closed by the ChannelGroup.
156             // (Because closed=true always happens-before ChannelGroup.close())
157             //
158             // See https://github.com/netty/netty/issues/4020
159             channel.close();
160         }
161 
162         return added;
163     }
164 
165     @Override
166     public boolean remove(Object o) {
167         Channel c = null;
168         if (o instanceof ChannelId) {
169             c = nonServerChannels.remove(o);
170             if (c == null) {
171                 c = serverChannels.remove(o);
172             }
173         } else if (o instanceof Channel) {
174             c = (Channel) o;
175             if (c instanceof ServerChannel) {
176                 c = serverChannels.remove(c.id());
177             } else {
178                 c = nonServerChannels.remove(c.id());
179             }
180         }
181 
182         if (c == null) {
183             return false;
184         }
185 
186         c.closeFuture().removeListener(remover);
187         return true;
188     }
189 
190     @Override
191     public void clear() {
192         nonServerChannels.clear();
193         serverChannels.clear();
194     }
195 
196     @Override
197     public Iterator<Channel> iterator() {
198         return new CombinedIterator<Channel>(
199                 serverChannels.values().iterator(),
200                 nonServerChannels.values().iterator());
201     }
202 
203     @Override
204     public Object[] toArray() {
205         Collection<Channel> channels = new ArrayList<Channel>(size());
206         channels.addAll(serverChannels.values());
207         channels.addAll(nonServerChannels.values());
208         return channels.toArray();
209     }
210 
211     @Override
212     public <T> T[] toArray(T[] a) {
213         Collection<Channel> channels = new ArrayList<Channel>(size());
214         channels.addAll(serverChannels.values());
215         channels.addAll(nonServerChannels.values());
216         return channels.toArray(a);
217     }
218 
219     @Override
220     public ChannelGroupFuture close() {
221         return close(ChannelMatchers.all());
222     }
223 
224     @Override
225     public ChannelGroupFuture disconnect() {
226         return disconnect(ChannelMatchers.all());
227     }
228 
229     @Override
230     public ChannelGroupFuture deregister() {
231         return deregister(ChannelMatchers.all());
232     }
233 
234     @Override
235     public ChannelGroupFuture write(Object message) {
236         return write(message, ChannelMatchers.all());
237     }
238 
239     // Create a safe duplicate of the message to write it to a channel but not affect other writes.
240     // See https://github.com/netty/netty/issues/1461
241     private static Object safeDuplicate(Object message) {
242         if (message instanceof ByteBuf) {
243             return ((ByteBuf) message).retainedDuplicate();
244         } else if (message instanceof ByteBufHolder) {
245             return ((ByteBufHolder) message).retainedDuplicate();
246         } else {
247             return ReferenceCountUtil.retain(message);
248         }
249     }
250 
251     @Override
252     public ChannelGroupFuture write(Object message, ChannelMatcher matcher) {
253         return write(message, matcher, false);
254     }
255 
256     @Override
257     public ChannelGroupFuture write(Object message, ChannelMatcher matcher, boolean voidPromise) {
258         ObjectUtil.checkNotNull(message, "message");
259         ObjectUtil.checkNotNull(matcher, "matcher");
260 
261         final ChannelGroupFuture future;
262         if (voidPromise) {
263             for (Channel c: nonServerChannels.values()) {
264                 if (matcher.matches(c)) {
265                     c.write(safeDuplicate(message), c.voidPromise());
266                 }
267             }
268             future = voidFuture;
269         } else {
270             Map<Channel, ChannelFuture> futures = new LinkedHashMap<Channel, ChannelFuture>(nonServerChannels.size());
271             for (Channel c: nonServerChannels.values()) {
272                 if (matcher.matches(c)) {
273                     futures.put(c, c.write(safeDuplicate(message)));
274                 }
275             }
276             future = new DefaultChannelGroupFuture(this, futures, executor);
277         }
278         ReferenceCountUtil.release(message);
279         return future;
280     }
281 
282     @Override
283     public ChannelGroup flush() {
284         return flush(ChannelMatchers.all());
285     }
286 
287     @Override
288     public ChannelGroupFuture flushAndWrite(Object message) {
289         return writeAndFlush(message);
290     }
291 
292     @Override
293     public ChannelGroupFuture writeAndFlush(Object message) {
294         return writeAndFlush(message, ChannelMatchers.all());
295     }
296 
297     @Override
298     public ChannelGroupFuture disconnect(ChannelMatcher matcher) {
299         ObjectUtil.checkNotNull(matcher, "matcher");
300 
301         Map<Channel, ChannelFuture> futures =
302                 new LinkedHashMap<Channel, ChannelFuture>(size());
303 
304         for (Channel c: serverChannels.values()) {
305             if (matcher.matches(c)) {
306                 futures.put(c, c.disconnect());
307             }
308         }
309         for (Channel c: nonServerChannels.values()) {
310             if (matcher.matches(c)) {
311                 futures.put(c, c.disconnect());
312             }
313         }
314 
315         return new DefaultChannelGroupFuture(this, futures, executor);
316     }
317 
318     @Override
319     public ChannelGroupFuture close(ChannelMatcher matcher) {
320         ObjectUtil.checkNotNull(matcher, "matcher");
321 
322         Map<Channel, ChannelFuture> futures =
323                 new LinkedHashMap<Channel, ChannelFuture>(size());
324 
325         if (stayClosed) {
326             // It is important to set the closed to true, before closing channels.
327             // Our invariants are:
328             // closed=true happens-before ChannelGroup.close()
329             // ChannelGroup.add() happens-before checking closed==true
330             //
331             // See https://github.com/netty/netty/issues/4020
332             closed = true;
333         }
334 
335         for (Channel c: serverChannels.values()) {
336             if (matcher.matches(c)) {
337                 futures.put(c, c.close());
338             }
339         }
340         for (Channel c: nonServerChannels.values()) {
341             if (matcher.matches(c)) {
342                 futures.put(c, c.close());
343             }
344         }
345 
346         return new DefaultChannelGroupFuture(this, futures, executor);
347     }
348 
349     @Override
350     public ChannelGroupFuture deregister(ChannelMatcher matcher) {
351         ObjectUtil.checkNotNull(matcher, "matcher");
352 
353         Map<Channel, ChannelFuture> futures =
354                 new LinkedHashMap<Channel, ChannelFuture>(size());
355 
356         for (Channel c: serverChannels.values()) {
357             if (matcher.matches(c)) {
358                 futures.put(c, c.deregister());
359             }
360         }
361         for (Channel c: nonServerChannels.values()) {
362             if (matcher.matches(c)) {
363                 futures.put(c, c.deregister());
364             }
365         }
366 
367         return new DefaultChannelGroupFuture(this, futures, executor);
368     }
369 
370     @Override
371     public ChannelGroup flush(ChannelMatcher matcher) {
372         for (Channel c: nonServerChannels.values()) {
373             if (matcher.matches(c)) {
374                 c.flush();
375             }
376         }
377         return this;
378     }
379 
380     @Override
381     public ChannelGroupFuture flushAndWrite(Object message, ChannelMatcher matcher) {
382         return writeAndFlush(message, matcher);
383     }
384 
385     @Override
386     public ChannelGroupFuture writeAndFlush(Object message, ChannelMatcher matcher) {
387         return writeAndFlush(message, matcher, false);
388     }
389 
390     @Override
391     public ChannelGroupFuture writeAndFlush(Object message, ChannelMatcher matcher, boolean voidPromise) {
392         ObjectUtil.checkNotNull(message, "message");
393 
394         final ChannelGroupFuture future;
395         if (voidPromise) {
396             for (Channel c: nonServerChannels.values()) {
397                 if (matcher.matches(c)) {
398                     c.writeAndFlush(safeDuplicate(message), c.voidPromise());
399                 }
400             }
401             future = voidFuture;
402         } else {
403             Map<Channel, ChannelFuture> futures = new LinkedHashMap<Channel, ChannelFuture>(nonServerChannels.size());
404             for (Channel c: nonServerChannels.values()) {
405                 if (matcher.matches(c)) {
406                     futures.put(c, c.writeAndFlush(safeDuplicate(message)));
407                 }
408             }
409             future = new DefaultChannelGroupFuture(this, futures, executor);
410         }
411         ReferenceCountUtil.release(message);
412         return future;
413     }
414 
415     @Override
416     public ChannelGroupFuture newCloseFuture() {
417         return newCloseFuture(ChannelMatchers.all());
418     }
419 
420     @Override
421     public ChannelGroupFuture newCloseFuture(ChannelMatcher matcher) {
422         Map<Channel, ChannelFuture> futures =
423                 new LinkedHashMap<Channel, ChannelFuture>(size());
424 
425         for (Channel c: serverChannels.values()) {
426             if (matcher.matches(c)) {
427                 futures.put(c, c.closeFuture());
428             }
429         }
430         for (Channel c: nonServerChannels.values()) {
431             if (matcher.matches(c)) {
432                 futures.put(c, c.closeFuture());
433             }
434         }
435 
436         return new DefaultChannelGroupFuture(this, futures, executor);
437     }
438 
439     @Override
440     public int hashCode() {
441         return System.identityHashCode(this);
442     }
443 
444     @Override
445     public boolean equals(Object o) {
446         return this == o;
447     }
448 
449     @Override
450     public int compareTo(ChannelGroup o) {
451         int v = name().compareTo(o.name());
452         if (v != 0) {
453             return v;
454         }
455 
456         return System.identityHashCode(this) - System.identityHashCode(o);
457     }
458 
459     @Override
460     public String toString() {
461         return StringUtil.simpleClassName(this) + "(name: " + name() + ", size: " + size() + ')';
462     }
463 }