1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty5.channel.group;
17
18 import io.netty5.buffer.api.Buffer;
19 import io.netty5.util.Resource;
20 import io.netty5.channel.Channel;
21 import io.netty5.channel.ChannelId;
22 import io.netty5.channel.ServerChannel;
23 import io.netty5.util.ReferenceCountUtil;
24 import io.netty5.util.concurrent.EventExecutor;
25 import io.netty5.util.concurrent.Future;
26 import io.netty5.util.concurrent.FutureContextListener;
27 import io.netty5.util.internal.StringUtil;
28
29 import java.util.AbstractSet;
30 import java.util.ArrayList;
31 import java.util.Collection;
32 import java.util.Iterator;
33 import java.util.LinkedHashMap;
34 import java.util.Map;
35 import java.util.concurrent.ConcurrentHashMap;
36 import java.util.concurrent.ConcurrentMap;
37 import java.util.concurrent.atomic.AtomicInteger;
38
39 import static java.util.Objects.requireNonNull;
40
41
42
43
44 public class DefaultChannelGroup extends AbstractSet<Channel> implements ChannelGroup {
45
46 private static final AtomicInteger nextId = new AtomicInteger();
47 private final String name;
48 final EventExecutor executor;
49 private final ConcurrentMap<ChannelId, Channel> serverChannels = new ConcurrentHashMap<>();
50 private final ConcurrentMap<ChannelId, Channel> nonServerChannels = new ConcurrentHashMap<>();
51 private final FutureContextListener<Channel, Void> remover = (channel, future) -> remove(channel);
52 private final boolean stayClosed;
53 private volatile boolean closed;
54
55
56
57
58
59 public DefaultChannelGroup(EventExecutor executor) {
60 this(executor, false);
61 }
62
63
64
65
66
67
68 public DefaultChannelGroup(String name, EventExecutor executor) {
69 this(name, executor, false);
70 }
71
72
73
74
75
76
77
78 public DefaultChannelGroup(EventExecutor executor, boolean stayClosed) {
79 this("group-0x" + Integer.toHexString(nextId.incrementAndGet()), executor, stayClosed);
80 }
81
82
83
84
85
86
87
88
89 public DefaultChannelGroup(String name, EventExecutor executor, boolean stayClosed) {
90 requireNonNull(name, "name");
91 this.name = name;
92 this.executor = executor;
93 this.stayClosed = stayClosed;
94 }
95
96 @Override
97 public String name() {
98 return name;
99 }
100
101 @Override
102 public Channel find(ChannelId id) {
103 Channel c = nonServerChannels.get(id);
104 if (c != null) {
105 return c;
106 } else {
107 return serverChannels.get(id);
108 }
109 }
110
111 @Override
112 public boolean isEmpty() {
113 return nonServerChannels.isEmpty() && serverChannels.isEmpty();
114 }
115
116 @Override
117 public int size() {
118 return nonServerChannels.size() + serverChannels.size();
119 }
120
121 @Override
122 public boolean contains(Object o) {
123 if (o instanceof ServerChannel) {
124 return serverChannels.containsValue(o);
125 } else if (o instanceof Channel) {
126 return nonServerChannels.containsValue(o);
127 }
128 return false;
129 }
130
131 @Override
132 public boolean add(Channel channel) {
133 ConcurrentMap<ChannelId, Channel> map =
134 channel instanceof ServerChannel? serverChannels : nonServerChannels;
135
136 boolean added = map.putIfAbsent(channel.id(), channel) == null;
137 if (added) {
138 channel.closeFuture().addListener(channel, remover);
139 }
140
141 if (stayClosed && closed) {
142
143
144
145
146
147
148
149
150
151
152
153
154 channel.close();
155 }
156
157 return added;
158 }
159
160 @Override
161 public boolean remove(Object o) {
162 Channel c = null;
163 if (o instanceof ChannelId) {
164 c = nonServerChannels.remove(o);
165 if (c == null) {
166 c = serverChannels.remove(o);
167 }
168 } else if (o instanceof Channel) {
169 c = (Channel) o;
170 if (c instanceof ServerChannel) {
171 c = serverChannels.remove(c.id());
172 } else {
173 c = nonServerChannels.remove(c.id());
174 }
175 }
176
177 return c != null;
178 }
179
180 @Override
181 public void clear() {
182 nonServerChannels.clear();
183 serverChannels.clear();
184 }
185
186 @Override
187 public Iterator<Channel> iterator() {
188 return new CombinedIterator<>(
189 serverChannels.values().iterator(),
190 nonServerChannels.values().iterator());
191 }
192
193 @Override
194 public Object[] toArray() {
195 Collection<Channel> channels = new ArrayList<>(size());
196 channels.addAll(serverChannels.values());
197 channels.addAll(nonServerChannels.values());
198 return channels.toArray();
199 }
200
201 @Override
202 public <T> T[] toArray(T[] a) {
203 Collection<Channel> channels = new ArrayList<>(size());
204 channels.addAll(serverChannels.values());
205 channels.addAll(nonServerChannels.values());
206 return channels.toArray(a);
207 }
208
209 @Override
210 public ChannelGroupFuture close() {
211 return close(ChannelMatchers.all());
212 }
213
214 @Override
215 public ChannelGroupFuture disconnect() {
216 return disconnect(ChannelMatchers.all());
217 }
218
219 @Override
220 public ChannelGroupFuture deregister() {
221 return deregister(ChannelMatchers.all());
222 }
223
224 @Override
225 public ChannelGroupFuture write(Object message) {
226 return write(message, ChannelMatchers.all());
227 }
228
229
230
231 private static Object safeDuplicate(Object message) {
232 if (message instanceof Buffer) {
233 return ((Buffer) message).copy();
234 } else {
235 return ReferenceCountUtil.retain(message);
236 }
237 }
238
239 @Override
240 public ChannelGroupFuture write(Object message, ChannelMatcher matcher) {
241 requireNonNull(message, "message");
242 requireNonNull(matcher, "matcher");
243
244 Map<Channel, Future<Void>> futures = new LinkedHashMap<>(nonServerChannels.size());
245 for (Channel c: nonServerChannels.values()) {
246 if (matcher.matches(c)) {
247 futures.put(c, c.write(safeDuplicate(message)));
248 }
249 }
250 ChannelGroupFuture future = new DefaultChannelGroupFuture(this, futures, executor);
251 Resource.dispose(message);
252 return future;
253 }
254
255 @Override
256 public ChannelGroup flush() {
257 return flush(ChannelMatchers.all());
258 }
259
260 @Override
261 public ChannelGroupFuture flushAndWrite(Object message) {
262 return writeAndFlush(message);
263 }
264
265 @Override
266 public ChannelGroupFuture writeAndFlush(Object message) {
267 return writeAndFlush(message, ChannelMatchers.all());
268 }
269
270 @Override
271 public ChannelGroupFuture disconnect(ChannelMatcher matcher) {
272 requireNonNull(matcher, "matcher");
273
274 Map<Channel, Future<Void>> futures =
275 new LinkedHashMap<>(size());
276
277 for (Channel c: serverChannels.values()) {
278 if (matcher.matches(c)) {
279 futures.put(c, c.disconnect());
280 }
281 }
282 for (Channel c: nonServerChannels.values()) {
283 if (matcher.matches(c)) {
284 futures.put(c, c.disconnect());
285 }
286 }
287
288 return new DefaultChannelGroupFuture(this, futures, executor);
289 }
290
291 @Override
292 public ChannelGroupFuture close(ChannelMatcher matcher) {
293 requireNonNull(matcher, "matcher");
294
295 Map<Channel, Future<Void>> futures =
296 new LinkedHashMap<>(size());
297
298 if (stayClosed) {
299
300
301
302
303
304
305 closed = true;
306 }
307
308 for (Channel c: serverChannels.values()) {
309 if (matcher.matches(c)) {
310 futures.put(c, c.close());
311 }
312 }
313 for (Channel c: nonServerChannels.values()) {
314 if (matcher.matches(c)) {
315 futures.put(c, c.close());
316 }
317 }
318
319 return new DefaultChannelGroupFuture(this, futures, executor);
320 }
321
322 @Override
323 public ChannelGroupFuture deregister(ChannelMatcher matcher) {
324 requireNonNull(matcher, "matcher");
325
326 Map<Channel, Future<Void>> futures =
327 new LinkedHashMap<>(size());
328
329 for (Channel c: serverChannels.values()) {
330 if (matcher.matches(c)) {
331 futures.put(c, c.deregister());
332 }
333 }
334 for (Channel c: nonServerChannels.values()) {
335 if (matcher.matches(c)) {
336 futures.put(c, c.deregister());
337 }
338 }
339
340 return new DefaultChannelGroupFuture(this, futures, executor);
341 }
342
343 @Override
344 public ChannelGroup flush(ChannelMatcher matcher) {
345 for (Channel c: nonServerChannels.values()) {
346 if (matcher.matches(c)) {
347 c.flush();
348 }
349 }
350 return this;
351 }
352
353 @Override
354 public ChannelGroupFuture flushAndWrite(Object message, ChannelMatcher matcher) {
355 return writeAndFlush(message, matcher);
356 }
357
358 @Override
359 public ChannelGroupFuture writeAndFlush(Object message, ChannelMatcher matcher) {
360 requireNonNull(message, "message");
361
362 Map<Channel, Future<Void>> futures = new LinkedHashMap<>(nonServerChannels.size());
363 for (Channel c: nonServerChannels.values()) {
364 if (matcher.matches(c)) {
365 futures.put(c, c.writeAndFlush(safeDuplicate(message)));
366 }
367 }
368 final ChannelGroupFuture future = new DefaultChannelGroupFuture(this, futures, executor);
369 Resource.dispose(message);
370 return future;
371 }
372
373 @Override
374 public ChannelGroupFuture newCloseFuture() {
375 return newCloseFuture(ChannelMatchers.all());
376 }
377
378 @Override
379 public ChannelGroupFuture newCloseFuture(ChannelMatcher matcher) {
380 Map<Channel, Future<Void>> futures =
381 new LinkedHashMap<>(size());
382
383 for (Channel c: serverChannels.values()) {
384 if (matcher.matches(c)) {
385 futures.put(c, c.closeFuture());
386 }
387 }
388 for (Channel c: nonServerChannels.values()) {
389 if (matcher.matches(c)) {
390 futures.put(c, c.closeFuture());
391 }
392 }
393
394 return new DefaultChannelGroupFuture(this, futures, executor);
395 }
396
397 @Override
398 public int hashCode() {
399 return System.identityHashCode(this);
400 }
401
402 @Override
403 public boolean equals(Object o) {
404 return this == o;
405 }
406
407 @Override
408 public int compareTo(ChannelGroup o) {
409 int v = name().compareTo(o.name());
410 if (v != 0) {
411 return v;
412 }
413
414 return System.identityHashCode(this) - System.identityHashCode(o);
415 }
416
417 @Override
418 public String toString() {
419 return StringUtil.simpleClassName(this) + "(name: " + name() + ", size: " + size() + ')';
420 }
421 }