1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.channel.group;
17
18 import java.net.SocketAddress;
19 import java.util.AbstractSet;
20 import java.util.ArrayList;
21 import java.util.Collection;
22 import java.util.Iterator;
23 import java.util.LinkedHashMap;
24 import java.util.Map;
25 import java.util.concurrent.ConcurrentHashMap;
26 import java.util.concurrent.ConcurrentMap;
27 import java.util.concurrent.atomic.AtomicInteger;
28
29 import org.jboss.netty.buffer.ChannelBuffer;
30 import org.jboss.netty.channel.Channel;
31 import org.jboss.netty.channel.ChannelFuture;
32 import org.jboss.netty.channel.ChannelFutureListener;
33 import org.jboss.netty.channel.ServerChannel;
34
35
36
37
38
39 public class DefaultChannelGroup extends AbstractSet<Channel> implements ChannelGroup {
40
41 private static final AtomicInteger nextId = new AtomicInteger();
42
43 private final String name;
44 private final ConcurrentMap<Integer, Channel> serverChannels = new ConcurrentHashMap<Integer, Channel>();
45 private final ConcurrentMap<Integer, Channel> nonServerChannels = new ConcurrentHashMap<Integer, Channel>();
46 private final ChannelFutureListener remover = new ChannelFutureListener() {
47 public void operationComplete(ChannelFuture future) throws Exception {
48 remove(future.getChannel());
49 }
50 };
51
52
53
54
55 public DefaultChannelGroup() {
56 this("group-0x" + Integer.toHexString(nextId.incrementAndGet()));
57 }
58
59
60
61
62
63
64 public DefaultChannelGroup(String name) {
65 if (name == null) {
66 throw new NullPointerException("name");
67 }
68 this.name = name;
69 }
70
71 public String getName() {
72 return name;
73 }
74
75 @Override
76 public boolean isEmpty() {
77 return nonServerChannels.isEmpty() && serverChannels.isEmpty();
78 }
79
80 @Override
81 public int size() {
82 return nonServerChannels.size() + serverChannels.size();
83 }
84
85 public Channel find(Integer id) {
86 Channel c = nonServerChannels.get(id);
87 if (c != null) {
88 return c;
89 } else {
90 return serverChannels.get(id);
91 }
92 }
93
94 @Override
95 public boolean contains(Object o) {
96 if (o instanceof Integer) {
97 return nonServerChannels.containsKey(o) || serverChannels.containsKey(o);
98 } else if (o instanceof Channel) {
99 Channel c = (Channel) o;
100 if (o instanceof ServerChannel) {
101 return serverChannels.containsKey(c.getId());
102 } else {
103 return nonServerChannels.containsKey(c.getId());
104 }
105 } else {
106 return false;
107 }
108 }
109
110 @Override
111 public boolean add(Channel channel) {
112 ConcurrentMap<Integer, Channel> map =
113 channel instanceof ServerChannel? serverChannels : nonServerChannels;
114
115 boolean added = map.putIfAbsent(channel.getId(), channel) == null;
116 if (added) {
117 channel.getCloseFuture().addListener(remover);
118 }
119 return added;
120 }
121
122 @Override
123 public boolean remove(Object o) {
124 Channel c = null;
125 if (o instanceof Integer) {
126 c = nonServerChannels.remove(o);
127 if (c == null) {
128 c = serverChannels.remove(o);
129 }
130 } else if (o instanceof Channel) {
131 c = (Channel) o;
132 if (c instanceof ServerChannel) {
133 c = serverChannels.remove(c.getId());
134 } else {
135 c = nonServerChannels.remove(c.getId());
136 }
137 }
138
139 if (c == null) {
140 return false;
141 }
142
143 c.getCloseFuture().removeListener(remover);
144 return true;
145 }
146
147 @Override
148 public void clear() {
149 nonServerChannels.clear();
150 serverChannels.clear();
151 }
152
153 @Override
154 public Iterator<Channel> iterator() {
155 return new CombinedIterator<Channel>(
156 serverChannels.values().iterator(),
157 nonServerChannels.values().iterator());
158 }
159
160 @Override
161 public Object[] toArray() {
162 Collection<Channel> channels = new ArrayList<Channel>(size());
163 channels.addAll(serverChannels.values());
164 channels.addAll(nonServerChannels.values());
165 return channels.toArray();
166 }
167
168 @Override
169 public <T> T[] toArray(T[] a) {
170 Collection<Channel> channels = new ArrayList<Channel>(size());
171 channels.addAll(serverChannels.values());
172 channels.addAll(nonServerChannels.values());
173 return channels.toArray(a);
174 }
175
176 public ChannelGroupFuture close() {
177 Map<Integer, ChannelFuture> futures =
178 new LinkedHashMap<Integer, ChannelFuture>(size());
179
180 for (Channel c: serverChannels.values()) {
181 futures.put(c.getId(), c.close().awaitUninterruptibly());
182 }
183 for (Channel c: nonServerChannels.values()) {
184 futures.put(c.getId(), c.close());
185 }
186
187 return new DefaultChannelGroupFuture(this, futures);
188 }
189
190 public ChannelGroupFuture disconnect() {
191 Map<Integer, ChannelFuture> futures =
192 new LinkedHashMap<Integer, ChannelFuture>(size());
193
194 for (Channel c: serverChannels.values()) {
195 futures.put(c.getId(), c.disconnect().awaitUninterruptibly());
196 }
197 for (Channel c: nonServerChannels.values()) {
198 futures.put(c.getId(), c.disconnect());
199 }
200
201 return new DefaultChannelGroupFuture(this, futures);
202 }
203
204 public ChannelGroupFuture setInterestOps(int interestOps) {
205 Map<Integer, ChannelFuture> futures =
206 new LinkedHashMap<Integer, ChannelFuture>(size());
207
208 for (Channel c: serverChannels.values()) {
209 futures.put(c.getId(), c.setInterestOps(interestOps).awaitUninterruptibly());
210 }
211 for (Channel c: nonServerChannels.values()) {
212 futures.put(c.getId(), c.setInterestOps(interestOps));
213 }
214
215 return new DefaultChannelGroupFuture(this, futures);
216 }
217
218 public ChannelGroupFuture setReadable(boolean readable) {
219 Map<Integer, ChannelFuture> futures =
220 new LinkedHashMap<Integer, ChannelFuture>(size());
221
222 for (Channel c: serverChannels.values()) {
223 futures.put(c.getId(), c.setReadable(readable).awaitUninterruptibly());
224 }
225 for (Channel c: nonServerChannels.values()) {
226 futures.put(c.getId(), c.setReadable(readable));
227 }
228
229 return new DefaultChannelGroupFuture(this, futures);
230 }
231
232 public ChannelGroupFuture unbind() {
233 Map<Integer, ChannelFuture> futures =
234 new LinkedHashMap<Integer, ChannelFuture>(size());
235
236 for (Channel c: serverChannels.values()) {
237 futures.put(c.getId(), c.unbind().awaitUninterruptibly());
238 }
239 for (Channel c: nonServerChannels.values()) {
240 futures.put(c.getId(), c.unbind());
241 }
242
243 return new DefaultChannelGroupFuture(this, futures);
244 }
245
246 public ChannelGroupFuture write(Object message) {
247 Map<Integer, ChannelFuture> futures =
248 new LinkedHashMap<Integer, ChannelFuture>(size());
249 if (message instanceof ChannelBuffer) {
250 ChannelBuffer buf = (ChannelBuffer) message;
251 for (Channel c: nonServerChannels.values()) {
252 futures.put(c.getId(), c.write(buf.duplicate()));
253 }
254 } else {
255 for (Channel c: nonServerChannels.values()) {
256 futures.put(c.getId(), c.write(message));
257 }
258 }
259 return new DefaultChannelGroupFuture(this, futures);
260 }
261
262 public ChannelGroupFuture write(Object message, SocketAddress remoteAddress) {
263 Map<Integer, ChannelFuture> futures =
264 new LinkedHashMap<Integer, ChannelFuture>(size());
265 if (message instanceof ChannelBuffer) {
266 ChannelBuffer buf = (ChannelBuffer) message;
267 for (Channel c: nonServerChannels.values()) {
268 futures.put(c.getId(), c.write(buf.duplicate(), remoteAddress));
269 }
270 } else {
271 for (Channel c: nonServerChannels.values()) {
272 futures.put(c.getId(), c.write(message, remoteAddress));
273 }
274 }
275 return new DefaultChannelGroupFuture(this, futures);
276 }
277
278 @Override
279 public int hashCode() {
280 return System.identityHashCode(this);
281 }
282
283 @Override
284 public boolean equals(Object o) {
285 return this == o;
286 }
287
288 public int compareTo(ChannelGroup o) {
289 int v = getName().compareTo(o.getName());
290 if (v != 0) {
291 return v;
292 }
293
294 return System.identityHashCode(this) - System.identityHashCode(o);
295 }
296
297 @Override
298 public String toString() {
299 return getClass().getSimpleName() +
300 "(name: " + getName() + ", size: " + size() + ')';
301 }
302 }