View Javadoc
1   /*
2    * Copyright 2013 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty5.channel.group;
17  
18  import io.netty5.buffer.api.Buffer;
19  import io.netty5.util.Resource;
20  import io.netty5.channel.Channel;
21  import io.netty5.channel.ChannelId;
22  import io.netty5.channel.ServerChannel;
23  import io.netty5.util.ReferenceCountUtil;
24  import io.netty5.util.concurrent.EventExecutor;
25  import io.netty5.util.concurrent.Future;
26  import io.netty5.util.concurrent.FutureContextListener;
27  import io.netty5.util.internal.StringUtil;
28  
29  import java.util.AbstractSet;
30  import java.util.ArrayList;
31  import java.util.Collection;
32  import java.util.Iterator;
33  import java.util.LinkedHashMap;
34  import java.util.Map;
35  import java.util.concurrent.ConcurrentHashMap;
36  import java.util.concurrent.ConcurrentMap;
37  import java.util.concurrent.atomic.AtomicInteger;
38  
39  import static java.util.Objects.requireNonNull;
40  
41  /**
42   * The default {@link ChannelGroup} implementation.
43   */
44  public class DefaultChannelGroup extends AbstractSet<Channel> implements ChannelGroup {
45  
46      private static final AtomicInteger nextId = new AtomicInteger();
47      private final String name;
48      final EventExecutor executor;
49      private final ConcurrentMap<ChannelId, Channel> serverChannels = new ConcurrentHashMap<>();
50      private final ConcurrentMap<ChannelId, Channel> nonServerChannels = new ConcurrentHashMap<>();
51      private final FutureContextListener<Channel, Void> remover = (channel, future) -> remove(channel);
52      private final boolean stayClosed;
53      private volatile boolean closed;
54  
55      /**
56       * Creates a new group with a generated name and the provided {@link EventExecutor} to notify the
57       * {@link ChannelGroupFuture}s.
58       */
59      public DefaultChannelGroup(EventExecutor executor) {
60          this(executor, false);
61      }
62  
63      /**
64       * Creates a new group with the specified {@code name} and {@link EventExecutor} to notify the
65       * {@link ChannelGroupFuture}s.  Please note that different groups can have the same name, which means no
66       * duplicate check is done against group names.
67       */
68      public DefaultChannelGroup(String name, EventExecutor executor) {
69          this(name, executor, false);
70      }
71  
72      /**
73       * Creates a new group with a generated name and the provided {@link EventExecutor} to notify the
74       * {@link ChannelGroupFuture}s. {@code stayClosed} defines whether or not, this group can be closed
75       * more than once. Adding channels to a closed group will immediately close them, too. This makes it
76       * easy, to shutdown server and child channels at once.
77       */
78      public DefaultChannelGroup(EventExecutor executor, boolean stayClosed) {
79          this("group-0x" + Integer.toHexString(nextId.incrementAndGet()), executor, stayClosed);
80      }
81  
82      /**
83       * Creates a new group with the specified {@code name} and {@link EventExecutor} to notify the
84       * {@link ChannelGroupFuture}s. {@code stayClosed} defines whether or not, this group can be closed
85       * more than once. Adding channels to a closed group will immediately close them, too. This makes it
86       * easy, to shutdown server and child channels at once. Please note that different groups can have
87       * the same name, which means no duplicate check is done against group names.
88       */
89      public DefaultChannelGroup(String name, EventExecutor executor, boolean stayClosed) {
90          requireNonNull(name, "name");
91          this.name = name;
92          this.executor = executor;
93          this.stayClosed = stayClosed;
94      }
95  
96      @Override
97      public String name() {
98          return name;
99      }
100 
101     @Override
102     public Channel find(ChannelId id) {
103         Channel c = nonServerChannels.get(id);
104         if (c != null) {
105             return c;
106         } else {
107             return serverChannels.get(id);
108         }
109     }
110 
111     @Override
112     public boolean isEmpty() {
113         return nonServerChannels.isEmpty() && serverChannels.isEmpty();
114     }
115 
116     @Override
117     public int size() {
118         return nonServerChannels.size() + serverChannels.size();
119     }
120 
121     @Override
122     public boolean contains(Object o) {
123         if (o instanceof ServerChannel) {
124             return serverChannels.containsValue(o);
125         } else if (o instanceof Channel) {
126             return nonServerChannels.containsValue(o);
127         }
128         return false;
129     }
130 
131     @Override
132     public boolean add(Channel channel) {
133         ConcurrentMap<ChannelId, Channel> map =
134             channel instanceof ServerChannel? serverChannels : nonServerChannels;
135 
136         boolean added = map.putIfAbsent(channel.id(), channel) == null;
137         if (added) {
138             channel.closeFuture().addListener(channel, remover);
139         }
140 
141         if (stayClosed && closed) {
142 
143             // First add channel, than check if closed.
144             // Seems inefficient at first, but this way a volatile
145             // gives us enough synchronization to be thread-safe.
146             //
147             // If true: Close right away.
148             // (Might be closed a second time by ChannelGroup.close(), but this is ok)
149             //
150             // If false: Channel will definitely be closed by the ChannelGroup.
151             // (Because closed=true always happens-before ChannelGroup.close())
152             //
153             // See https://github.com/netty/netty/issues/4020
154             channel.close();
155         }
156 
157         return added;
158     }
159 
160     @Override
161     public boolean remove(Object o) {
162         Channel c = null;
163         if (o instanceof ChannelId) {
164             c = nonServerChannels.remove(o);
165             if (c == null) {
166                 c = serverChannels.remove(o);
167             }
168         } else if (o instanceof Channel) {
169             c = (Channel) o;
170             if (c instanceof ServerChannel) {
171                 c = serverChannels.remove(c.id());
172             } else {
173                 c = nonServerChannels.remove(c.id());
174             }
175         }
176 
177         return c != null;
178     }
179 
180     @Override
181     public void clear() {
182         nonServerChannels.clear();
183         serverChannels.clear();
184     }
185 
186     @Override
187     public Iterator<Channel> iterator() {
188         return new CombinedIterator<>(
189                 serverChannels.values().iterator(),
190                 nonServerChannels.values().iterator());
191     }
192 
193     @Override
194     public Object[] toArray() {
195         Collection<Channel> channels = new ArrayList<>(size());
196         channels.addAll(serverChannels.values());
197         channels.addAll(nonServerChannels.values());
198         return channels.toArray();
199     }
200 
201     @Override
202     public <T> T[] toArray(T[] a) {
203         Collection<Channel> channels = new ArrayList<>(size());
204         channels.addAll(serverChannels.values());
205         channels.addAll(nonServerChannels.values());
206         return channels.toArray(a);
207     }
208 
209     @Override
210     public ChannelGroupFuture close() {
211         return close(ChannelMatchers.all());
212     }
213 
214     @Override
215     public ChannelGroupFuture disconnect() {
216         return disconnect(ChannelMatchers.all());
217     }
218 
219     @Override
220     public ChannelGroupFuture deregister() {
221         return deregister(ChannelMatchers.all());
222     }
223 
224     @Override
225     public ChannelGroupFuture write(Object message) {
226         return write(message, ChannelMatchers.all());
227     }
228 
229     // Create a safe duplicate of the message to write it to a channel but not affect other writes.
230     // See https://github.com/netty/netty/issues/1461
231     private static Object safeDuplicate(Object message) {
232         if (message instanceof Buffer) {
233             return ((Buffer) message).copy();
234         } else {
235             return ReferenceCountUtil.retain(message);
236         }
237     }
238 
239     @Override
240     public ChannelGroupFuture write(Object message, ChannelMatcher matcher) {
241         requireNonNull(message, "message");
242         requireNonNull(matcher, "matcher");
243 
244         Map<Channel, Future<Void>> futures = new LinkedHashMap<>(nonServerChannels.size());
245         for (Channel c: nonServerChannels.values()) {
246             if (matcher.matches(c)) {
247                 futures.put(c, c.write(safeDuplicate(message)));
248             }
249         }
250         ChannelGroupFuture future = new DefaultChannelGroupFuture(this, futures, executor);
251         Resource.dispose(message);
252         return future;
253     }
254 
255     @Override
256     public ChannelGroup flush() {
257         return flush(ChannelMatchers.all());
258     }
259 
260     @Override
261     public ChannelGroupFuture flushAndWrite(Object message) {
262         return writeAndFlush(message);
263     }
264 
265     @Override
266     public ChannelGroupFuture writeAndFlush(Object message) {
267         return writeAndFlush(message, ChannelMatchers.all());
268     }
269 
270     @Override
271     public ChannelGroupFuture disconnect(ChannelMatcher matcher) {
272         requireNonNull(matcher, "matcher");
273 
274         Map<Channel, Future<Void>> futures =
275                 new LinkedHashMap<>(size());
276 
277         for (Channel c: serverChannels.values()) {
278             if (matcher.matches(c)) {
279                 futures.put(c, c.disconnect());
280             }
281         }
282         for (Channel c: nonServerChannels.values()) {
283             if (matcher.matches(c)) {
284                 futures.put(c, c.disconnect());
285             }
286         }
287 
288         return new DefaultChannelGroupFuture(this, futures, executor);
289     }
290 
291     @Override
292     public ChannelGroupFuture close(ChannelMatcher matcher) {
293         requireNonNull(matcher, "matcher");
294 
295         Map<Channel, Future<Void>> futures =
296                 new LinkedHashMap<>(size());
297 
298         if (stayClosed) {
299             // It is important to set the closed to true, before closing channels.
300             // Our invariants are:
301             // closed=true happens-before ChannelGroup.close()
302             // ChannelGroup.add() happens-before checking closed==true
303             //
304             // See https://github.com/netty/netty/issues/4020
305             closed = true;
306         }
307 
308         for (Channel c: serverChannels.values()) {
309             if (matcher.matches(c)) {
310                 futures.put(c, c.close());
311             }
312         }
313         for (Channel c: nonServerChannels.values()) {
314             if (matcher.matches(c)) {
315                 futures.put(c, c.close());
316             }
317         }
318 
319         return new DefaultChannelGroupFuture(this, futures, executor);
320     }
321 
322     @Override
323     public ChannelGroupFuture deregister(ChannelMatcher matcher) {
324         requireNonNull(matcher, "matcher");
325 
326         Map<Channel, Future<Void>> futures =
327                 new LinkedHashMap<>(size());
328 
329         for (Channel c: serverChannels.values()) {
330             if (matcher.matches(c)) {
331                 futures.put(c, c.deregister());
332             }
333         }
334         for (Channel c: nonServerChannels.values()) {
335             if (matcher.matches(c)) {
336                 futures.put(c, c.deregister());
337             }
338         }
339 
340         return new DefaultChannelGroupFuture(this, futures, executor);
341     }
342 
343     @Override
344     public ChannelGroup flush(ChannelMatcher matcher) {
345         for (Channel c: nonServerChannels.values()) {
346             if (matcher.matches(c)) {
347                 c.flush();
348             }
349         }
350         return this;
351     }
352 
353     @Override
354     public ChannelGroupFuture flushAndWrite(Object message, ChannelMatcher matcher) {
355         return writeAndFlush(message, matcher);
356     }
357 
358     @Override
359     public ChannelGroupFuture writeAndFlush(Object message, ChannelMatcher matcher) {
360         requireNonNull(message, "message");
361 
362         Map<Channel, Future<Void>> futures = new LinkedHashMap<>(nonServerChannels.size());
363         for (Channel c: nonServerChannels.values()) {
364             if (matcher.matches(c)) {
365                 futures.put(c, c.writeAndFlush(safeDuplicate(message)));
366             }
367         }
368         final ChannelGroupFuture future = new DefaultChannelGroupFuture(this, futures, executor);
369         Resource.dispose(message);
370         return future;
371     }
372 
373     @Override
374     public ChannelGroupFuture newCloseFuture() {
375         return newCloseFuture(ChannelMatchers.all());
376     }
377 
378     @Override
379     public ChannelGroupFuture newCloseFuture(ChannelMatcher matcher) {
380         Map<Channel, Future<Void>> futures =
381                 new LinkedHashMap<>(size());
382 
383         for (Channel c: serverChannels.values()) {
384             if (matcher.matches(c)) {
385                 futures.put(c, c.closeFuture());
386             }
387         }
388         for (Channel c: nonServerChannels.values()) {
389             if (matcher.matches(c)) {
390                 futures.put(c, c.closeFuture());
391             }
392         }
393 
394         return new DefaultChannelGroupFuture(this, futures, executor);
395     }
396 
397     @Override
398     public int hashCode() {
399         return System.identityHashCode(this);
400     }
401 
402     @Override
403     public boolean equals(Object o) {
404         return this == o;
405     }
406 
407     @Override
408     public int compareTo(ChannelGroup o) {
409         int v = name().compareTo(o.name());
410         if (v != 0) {
411             return v;
412         }
413 
414         return System.identityHashCode(this) - System.identityHashCode(o);
415     }
416 
417     @Override
418     public String toString() {
419         return StringUtil.simpleClassName(this) + "(name: " + name() + ", size: " + size() + ')';
420     }
421 }