1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.group;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufHolder;
20 import io.netty.channel.Channel;
21 import io.netty.channel.ChannelFuture;
22 import io.netty.channel.ChannelFutureListener;
23 import io.netty.channel.ServerChannel;
24 import io.netty.util.ReferenceCountUtil;
25 import io.netty.util.concurrent.EventExecutor;
26 import io.netty.util.internal.ConcurrentSet;
27 import io.netty.util.internal.StringUtil;
28
29 import java.util.AbstractSet;
30 import java.util.ArrayList;
31 import java.util.Collection;
32 import java.util.Iterator;
33 import java.util.LinkedHashMap;
34 import java.util.Map;
35 import java.util.concurrent.atomic.AtomicInteger;
36
37
38
39
40 public class DefaultChannelGroup extends AbstractSet<Channel> implements ChannelGroup {
41
42 private static final AtomicInteger nextId = new AtomicInteger();
43 private final String name;
44 private final EventExecutor executor;
45 private final ConcurrentSet<Channel> serverChannels = new ConcurrentSet<Channel>();
46 private final ConcurrentSet<Channel> nonServerChannels = new ConcurrentSet<Channel>();
47 private final ChannelFutureListener remover = new ChannelFutureListener() {
48 @Override
49 public void operationComplete(ChannelFuture future) throws Exception {
50 remove(future.channel());
51 }
52 };
53 private final boolean stayClosed;
54 private volatile boolean closed;
55
56
57
58
59
60 public DefaultChannelGroup(EventExecutor executor) {
61 this(executor, false);
62 }
63
64
65
66
67
68
69 public DefaultChannelGroup(String name, EventExecutor executor) {
70 this(name, executor, false);
71 }
72
73
74
75
76
77
78
79 public DefaultChannelGroup(EventExecutor executor, boolean stayClosed) {
80 this("group-0x" + Integer.toHexString(nextId.incrementAndGet()), executor, stayClosed);
81 }
82
83
84
85
86
87
88
89
90 public DefaultChannelGroup(String name, EventExecutor executor, boolean stayClosed) {
91 if (name == null) {
92 throw new NullPointerException("name");
93 }
94 this.name = name;
95 this.executor = executor;
96 this.stayClosed = stayClosed;
97 }
98
99 @Override
100 public String name() {
101 return name;
102 }
103
104 @Override
105 public boolean isEmpty() {
106 return nonServerChannels.isEmpty() && serverChannels.isEmpty();
107 }
108
109 @Override
110 public int size() {
111 return nonServerChannels.size() + serverChannels.size();
112 }
113
114 @Override
115 public boolean contains(Object o) {
116 if (o instanceof Channel) {
117 Channel c = (Channel) o;
118 if (o instanceof ServerChannel) {
119 return serverChannels.contains(c);
120 } else {
121 return nonServerChannels.contains(c);
122 }
123 } else {
124 return false;
125 }
126 }
127
128 @Override
129 public boolean add(Channel channel) {
130 ConcurrentSet<Channel> set =
131 channel instanceof ServerChannel? serverChannels : nonServerChannels;
132
133 boolean added = set.add(channel);
134 if (added) {
135 channel.closeFuture().addListener(remover);
136 }
137
138 if (stayClosed && closed) {
139
140
141
142
143
144
145
146
147
148
149
150
151 channel.close();
152 }
153
154 return added;
155 }
156
157 @Override
158 public boolean remove(Object o) {
159 if (!(o instanceof Channel)) {
160 return false;
161 }
162 boolean removed;
163 Channel c = (Channel) o;
164 if (c instanceof ServerChannel) {
165 removed = serverChannels.remove(c);
166 } else {
167 removed = nonServerChannels.remove(c);
168 }
169 if (!removed) {
170 return false;
171 }
172
173 c.closeFuture().removeListener(remover);
174 return true;
175 }
176
177 @Override
178 public void clear() {
179 nonServerChannels.clear();
180 serverChannels.clear();
181 }
182
183 @Override
184 public Iterator<Channel> iterator() {
185 return new CombinedIterator<Channel>(
186 serverChannels.iterator(),
187 nonServerChannels.iterator());
188 }
189
190 @Override
191 public Object[] toArray() {
192 Collection<Channel> channels = new ArrayList<Channel>(size());
193 channels.addAll(serverChannels);
194 channels.addAll(nonServerChannels);
195 return channels.toArray();
196 }
197
198 @Override
199 public <T> T[] toArray(T[] a) {
200 Collection<Channel> channels = new ArrayList<Channel>(size());
201 channels.addAll(serverChannels);
202 channels.addAll(nonServerChannels);
203 return channels.toArray(a);
204 }
205
206 @Override
207 public ChannelGroupFuture close() {
208 return close(ChannelMatchers.all());
209 }
210
211 @Override
212 public ChannelGroupFuture disconnect() {
213 return disconnect(ChannelMatchers.all());
214 }
215
216 @Override
217 public ChannelGroupFuture deregister() {
218 return deregister(ChannelMatchers.all());
219 }
220
221 @Override
222 public ChannelGroupFuture write(Object message) {
223 return write(message, ChannelMatchers.all());
224 }
225
226
227
228 private static Object safeDuplicate(Object message) {
229 if (message instanceof ByteBuf) {
230 return ((ByteBuf) message).duplicate().retain();
231 } else if (message instanceof ByteBufHolder) {
232 return ((ByteBufHolder) message).duplicate().retain();
233 } else {
234 return ReferenceCountUtil.retain(message);
235 }
236 }
237
238 @Override
239 public ChannelGroupFuture write(Object message, ChannelMatcher matcher) {
240 if (message == null) {
241 throw new NullPointerException("message");
242 }
243 if (matcher == null) {
244 throw new NullPointerException("matcher");
245 }
246
247 Map<Channel, ChannelFuture> futures = new LinkedHashMap<Channel, ChannelFuture>(size());
248 for (Channel c: nonServerChannels) {
249 if (matcher.matches(c)) {
250 futures.put(c, c.write(safeDuplicate(message)));
251 }
252 }
253
254 ReferenceCountUtil.release(message);
255 return new DefaultChannelGroupFuture(this, futures, executor);
256 }
257
258 @Override
259 public ChannelGroup flush() {
260 return flush(ChannelMatchers.all());
261 }
262
263 @Override
264 public ChannelGroupFuture flushAndWrite(Object message) {
265 return writeAndFlush(message);
266 }
267
268 @Override
269 public ChannelGroupFuture writeAndFlush(Object message) {
270 return writeAndFlush(message, ChannelMatchers.all());
271 }
272
273 @Override
274 public ChannelGroupFuture disconnect(ChannelMatcher matcher) {
275 if (matcher == null) {
276 throw new NullPointerException("matcher");
277 }
278
279 Map<Channel, ChannelFuture> futures =
280 new LinkedHashMap<Channel, ChannelFuture>(size());
281
282 for (Channel c: serverChannels) {
283 if (matcher.matches(c)) {
284 futures.put(c, c.disconnect());
285 }
286 }
287 for (Channel c: nonServerChannels) {
288 if (matcher.matches(c)) {
289 futures.put(c, c.disconnect());
290 }
291 }
292
293 return new DefaultChannelGroupFuture(this, futures, executor);
294 }
295
296 @Override
297 public ChannelGroupFuture close(ChannelMatcher matcher) {
298 if (matcher == null) {
299 throw new NullPointerException("matcher");
300 }
301
302 Map<Channel, ChannelFuture> futures =
303 new LinkedHashMap<Channel, ChannelFuture>(size());
304
305 if (stayClosed) {
306
307
308
309
310
311
312 closed = true;
313 }
314
315 for (Channel c: serverChannels) {
316 if (matcher.matches(c)) {
317 futures.put(c, c.close());
318 }
319 }
320 for (Channel c: nonServerChannels) {
321 if (matcher.matches(c)) {
322 futures.put(c, c.close());
323 }
324 }
325
326 return new DefaultChannelGroupFuture(this, futures, executor);
327 }
328
329 @Override
330 public ChannelGroupFuture deregister(ChannelMatcher matcher) {
331 if (matcher == null) {
332 throw new NullPointerException("matcher");
333 }
334
335 Map<Channel, ChannelFuture> futures =
336 new LinkedHashMap<Channel, ChannelFuture>(size());
337
338 for (Channel c: serverChannels) {
339 if (matcher.matches(c)) {
340 futures.put(c, c.deregister());
341 }
342 }
343 for (Channel c: nonServerChannels) {
344 if (matcher.matches(c)) {
345 futures.put(c, c.deregister());
346 }
347 }
348
349 return new DefaultChannelGroupFuture(this, futures, executor);
350 }
351
352 @Override
353 public ChannelGroup flush(ChannelMatcher matcher) {
354 for (Channel c: nonServerChannels) {
355 if (matcher.matches(c)) {
356 c.flush();
357 }
358 }
359 return this;
360 }
361
362 @Override
363 public ChannelGroupFuture flushAndWrite(Object message, ChannelMatcher matcher) {
364 return writeAndFlush(message, matcher);
365 }
366
367 @Override
368 public ChannelGroupFuture writeAndFlush(Object message, ChannelMatcher matcher) {
369 if (message == null) {
370 throw new NullPointerException("message");
371 }
372
373 Map<Channel, ChannelFuture> futures = new LinkedHashMap<Channel, ChannelFuture>(size());
374
375 for (Channel c: nonServerChannels) {
376 if (matcher.matches(c)) {
377 futures.put(c, c.writeAndFlush(safeDuplicate(message)));
378 }
379 }
380
381 ReferenceCountUtil.release(message);
382
383 return new DefaultChannelGroupFuture(this, futures, executor);
384 }
385
386
387
388
389
390 public ChannelGroupFuture newCloseFuture() {
391 return newCloseFuture(ChannelMatchers.all());
392 }
393
394
395
396
397
398 public ChannelGroupFuture newCloseFuture(ChannelMatcher matcher) {
399 Map<Channel, ChannelFuture> futures =
400 new LinkedHashMap<Channel, ChannelFuture>(size());
401
402 for (Channel c: serverChannels) {
403 if (matcher.matches(c)) {
404 futures.put(c, c.closeFuture());
405 }
406 }
407 for (Channel c: nonServerChannels) {
408 if (matcher.matches(c)) {
409 futures.put(c, c.closeFuture());
410 }
411 }
412
413 return new DefaultChannelGroupFuture(this, futures, executor);
414 }
415
416 @Override
417 public int hashCode() {
418 return System.identityHashCode(this);
419 }
420
421 @Override
422 public boolean equals(Object o) {
423 return this == o;
424 }
425
426 @Override
427 public int compareTo(ChannelGroup o) {
428 int v = name().compareTo(o.name());
429 if (v != 0) {
430 return v;
431 }
432
433 return System.identityHashCode(this) - System.identityHashCode(o);
434 }
435
436 @Override
437 public String toString() {
438 return StringUtil.simpleClassName(this) + "(name: " + name() + ", size: " + size() + ')';
439 }
440 }