1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.channel;
17
18 import java.net.SocketAddress;
19 import java.util.Random;
20 import java.util.concurrent.ConcurrentHashMap;
21 import java.util.concurrent.ConcurrentMap;
22 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
23
24
25
26
27 public abstract class AbstractChannel implements Channel {
28
29 static final ConcurrentMap<Integer, Channel> allChannels = new ConcurrentHashMap<Integer, Channel>();
30
31 private static final Random random = new Random();
32
33 private static Integer allocateId(Channel channel) {
34 Integer id = random.nextInt();
35 for (;;) {
36
37
38 if (allChannels.putIfAbsent(id, channel) == null) {
39
40 return id;
41 } else {
42
43 id = id.intValue() + 1;
44 }
45 }
46 }
47
48 private final Integer id;
49 private final Channel parent;
50 private final ChannelFactory factory;
51 private final ChannelPipeline pipeline;
52 private final ChannelFuture succeededFuture = new SucceededChannelFuture(this);
53 private final ChannelCloseFuture closeFuture = new ChannelCloseFuture();
54 private volatile int interestOps = OP_READ;
55
56
57 private boolean strValConnected;
58 private String strVal;
59 private volatile Object attachment;
60
61 private static final AtomicIntegerFieldUpdater<AbstractChannel> UNWRITABLE_UPDATER;
62 @SuppressWarnings("UnusedDeclaration")
63 private volatile int unwritable;
64
65 static {
66 UNWRITABLE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(AbstractChannel.class, "unwritable");
67 }
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82 protected AbstractChannel(
83 Channel parent, ChannelFactory factory,
84 ChannelPipeline pipeline, ChannelSink sink) {
85
86 this.parent = parent;
87 this.factory = factory;
88 this.pipeline = pipeline;
89
90 id = allocateId(this);
91
92 pipeline.attach(this, sink);
93 }
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109 protected AbstractChannel(
110 Integer id,
111 Channel parent, ChannelFactory factory,
112 ChannelPipeline pipeline, ChannelSink sink) {
113
114 this.id = id;
115 this.parent = parent;
116 this.factory = factory;
117 this.pipeline = pipeline;
118 pipeline.attach(this, sink);
119 }
120
121 public final Integer getId() {
122 return id;
123 }
124
125 public Channel getParent() {
126 return parent;
127 }
128
129 public ChannelFactory getFactory() {
130 return factory;
131 }
132
133 public ChannelPipeline getPipeline() {
134 return pipeline;
135 }
136
137
138
139
140 protected ChannelFuture getSucceededFuture() {
141 return succeededFuture;
142 }
143
144
145
146
147
148 protected ChannelFuture getUnsupportedOperationFuture() {
149 return new FailedChannelFuture(this, new UnsupportedOperationException());
150 }
151
152
153
154
155 @Override
156 public final int hashCode() {
157 return id;
158 }
159
160
161
162
163
164 @Override
165 public final boolean equals(Object o) {
166 return this == o;
167 }
168
169
170
171
172 public final int compareTo(Channel o) {
173 return getId().compareTo(o.getId());
174 }
175
176 public boolean isOpen() {
177 return !closeFuture.isDone();
178 }
179
180
181
182
183
184
185
186
187
188 protected boolean setClosed() {
189
190
191 allChannels.remove(id);
192
193 return closeFuture.setClosed();
194 }
195
196 public ChannelFuture bind(SocketAddress localAddress) {
197 return Channels.bind(this, localAddress);
198 }
199
200 public ChannelFuture unbind() {
201 return Channels.unbind(this);
202 }
203
204 public ChannelFuture close() {
205 ChannelFuture returnedCloseFuture = Channels.close(this);
206 assert closeFuture == returnedCloseFuture;
207 return closeFuture;
208 }
209
210 public ChannelFuture getCloseFuture() {
211 return closeFuture;
212 }
213
214 public ChannelFuture connect(SocketAddress remoteAddress) {
215 return Channels.connect(this, remoteAddress);
216 }
217
218 public ChannelFuture disconnect() {
219 return Channels.disconnect(this);
220 }
221
222 public int getInterestOps() {
223 if (!isOpen()) {
224 return Channel.OP_WRITE;
225 }
226
227 int interestOps = getInternalInterestOps() & ~OP_WRITE;
228 if (!isWritable()) {
229 interestOps |= OP_WRITE;
230 }
231 return interestOps;
232 }
233
234 public ChannelFuture setInterestOps(int interestOps) {
235 return Channels.setInterestOps(this, interestOps);
236 }
237
238 protected int getInternalInterestOps() {
239 return interestOps;
240 }
241
242
243
244
245
246
247 protected void setInternalInterestOps(int interestOps) {
248 this.interestOps = interestOps;
249 }
250
251 public boolean isReadable() {
252 return (getInternalInterestOps() & OP_READ) != 0;
253 }
254
255 public boolean isWritable() {
256 return unwritable == 0;
257 }
258
259 public final boolean getUserDefinedWritability(int index) {
260 return (unwritable & writabilityMask(index)) == 0;
261 }
262
263 public final void setUserDefinedWritability(int index, boolean writable) {
264 if (writable) {
265 setUserDefinedWritability(index);
266 } else {
267 clearUserDefinedWritability(index);
268 }
269 }
270
271 private void setUserDefinedWritability(int index) {
272 final int mask = ~writabilityMask(index);
273 for (;;) {
274 final int oldValue = unwritable;
275 final int newValue = oldValue & mask;
276 if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
277 if (oldValue != 0 && newValue == 0) {
278 getPipeline().sendUpstream(
279 new UpstreamChannelStateEvent(
280 this, ChannelState.INTEREST_OPS, getInterestOps()));
281 }
282 break;
283 }
284 }
285 }
286
287 private void clearUserDefinedWritability(int index) {
288 final int mask = writabilityMask(index);
289 for (;;) {
290 final int oldValue = unwritable;
291 final int newValue = oldValue | mask;
292 if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
293 if (oldValue == 0 && newValue != 0) {
294 getPipeline().sendUpstream(
295 new UpstreamChannelStateEvent(
296 this, ChannelState.INTEREST_OPS, getInterestOps()));
297 }
298 break;
299 }
300 }
301 }
302
303 private static int writabilityMask(int index) {
304 if (index < 1 || index > 31) {
305 throw new IllegalArgumentException("index: " + index + " (expected: 1~31)");
306 }
307 return 1 << index;
308 }
309
310 protected boolean setWritable() {
311 for (;;) {
312 final int oldValue = unwritable;
313 final int newValue = oldValue & ~1;
314 if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
315 if (oldValue != 0 && newValue == 0) {
316 return true;
317 }
318 break;
319 }
320 }
321 return false;
322 }
323
324 protected boolean setUnwritable() {
325 for (;;) {
326 final int oldValue = unwritable;
327 final int newValue = oldValue | 1;
328 if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
329 if (oldValue == 0 && newValue != 0) {
330 return true;
331 }
332 break;
333 }
334 }
335 return false;
336 }
337
338 public ChannelFuture setReadable(boolean readable) {
339 if (readable) {
340 return setInterestOps(getInterestOps() | OP_READ);
341 } else {
342 return setInterestOps(getInterestOps() & ~OP_READ);
343 }
344 }
345
346 public ChannelFuture write(Object message) {
347 return Channels.write(this, message);
348 }
349
350 public ChannelFuture write(Object message, SocketAddress remoteAddress) {
351 return Channels.write(this, message, remoteAddress);
352 }
353
354 public Object getAttachment() {
355 return attachment;
356 }
357
358 public void setAttachment(Object attachment) {
359 this.attachment = attachment;
360 }
361
362
363
364
365
366
367 @Override
368 public String toString() {
369 boolean connected = isConnected();
370 if (strValConnected == connected && strVal != null) {
371 return strVal;
372 }
373
374 StringBuilder buf = new StringBuilder(128);
375 buf.append("[id: 0x");
376 buf.append(getIdString());
377
378 SocketAddress localAddress = getLocalAddress();
379 SocketAddress remoteAddress = getRemoteAddress();
380 if (remoteAddress != null) {
381 buf.append(", ");
382 if (getParent() == null) {
383 buf.append(localAddress);
384 buf.append(connected? " => " : " :> ");
385 buf.append(remoteAddress);
386 } else {
387 buf.append(remoteAddress);
388 buf.append(connected? " => " : " :> ");
389 buf.append(localAddress);
390 }
391 } else if (localAddress != null) {
392 buf.append(", ");
393 buf.append(localAddress);
394 }
395
396 buf.append(']');
397
398 String strVal = buf.toString();
399 this.strVal = strVal;
400 strValConnected = connected;
401 return strVal;
402 }
403
404 private String getIdString() {
405 String answer = Integer.toHexString(id.intValue());
406 switch (answer.length()) {
407 case 0:
408 answer = "00000000";
409 break;
410 case 1:
411 answer = "0000000" + answer;
412 break;
413 case 2:
414 answer = "000000" + answer;
415 break;
416 case 3:
417 answer = "00000" + answer;
418 break;
419 case 4:
420 answer = "0000" + answer;
421 break;
422 case 5:
423 answer = "000" + answer;
424 break;
425 case 6:
426 answer = "00" + answer;
427 break;
428 case 7:
429 answer = '0' + answer;
430 break;
431 }
432 return answer;
433 }
434
435 private final class ChannelCloseFuture extends DefaultChannelFuture {
436
437 ChannelCloseFuture() {
438 super(AbstractChannel.this, false);
439 }
440
441 @Override
442 public boolean setSuccess() {
443
444 return false;
445 }
446
447 @Override
448 public boolean setFailure(Throwable cause) {
449
450 return false;
451 }
452
453 boolean setClosed() {
454 return super.setSuccess();
455 }
456 }
457 }