1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.channel.local;
17
18 import static org.jboss.netty.channel.Channels.*;
19
20 import java.nio.channels.ClosedChannelException;
21 import java.nio.channels.NotYetConnectedException;
22 import java.util.Queue;
23 import java.util.concurrent.ConcurrentLinkedQueue;
24 import java.util.concurrent.atomic.AtomicInteger;
25
26 import org.jboss.netty.channel.AbstractChannel;
27 import org.jboss.netty.channel.ChannelConfig;
28 import org.jboss.netty.channel.ChannelException;
29 import org.jboss.netty.channel.ChannelFactory;
30 import org.jboss.netty.channel.ChannelFuture;
31 import org.jboss.netty.channel.ChannelFutureListener;
32 import org.jboss.netty.channel.ChannelPipeline;
33 import org.jboss.netty.channel.ChannelSink;
34 import org.jboss.netty.channel.DefaultChannelConfig;
35 import org.jboss.netty.channel.MessageEvent;
36 import org.jboss.netty.util.internal.ThreadLocalBoolean;
37
38
39
40 final class DefaultLocalChannel extends AbstractChannel implements LocalChannel {
41
42
43 private static final int ST_OPEN = 0;
44 private static final int ST_BOUND = 1;
45 private static final int ST_CONNECTED = 2;
46 private static final int ST_CLOSED = -1;
47 final AtomicInteger state = new AtomicInteger(ST_OPEN);
48
49 private final ChannelConfig config;
50 private final ThreadLocalBoolean delivering = new ThreadLocalBoolean();
51
52 final Queue<MessageEvent> writeBuffer = new ConcurrentLinkedQueue<MessageEvent>();
53
54 volatile DefaultLocalChannel pairedChannel;
55 volatile LocalAddress localAddress;
56 volatile LocalAddress remoteAddress;
57
58 DefaultLocalChannel(
59 LocalServerChannel parent, ChannelFactory factory, ChannelPipeline pipeline,
60 ChannelSink sink, DefaultLocalChannel pairedChannel) {
61 super(parent, factory, pipeline, sink);
62 this.pairedChannel = pairedChannel;
63 config = new DefaultChannelConfig();
64
65
66
67 getCloseFuture().addListener(new ChannelFutureListener() {
68 public void operationComplete(ChannelFuture future) throws Exception {
69 state.set(ST_CLOSED);
70 }
71 });
72
73 fireChannelOpen(this);
74 }
75
76 public ChannelConfig getConfig() {
77 return config;
78 }
79
80 @Override
81 public boolean isOpen() {
82 return state.get() >= ST_OPEN;
83 }
84
85 public boolean isBound() {
86 return state.get() >= ST_BOUND;
87 }
88
89 public boolean isConnected() {
90 return state.get() == ST_CONNECTED;
91 }
92
93 void setBound() throws ClosedChannelException {
94 if (!state.compareAndSet(ST_OPEN, ST_BOUND)) {
95 switch (state.get()) {
96 case ST_CLOSED:
97 throw new ClosedChannelException();
98 default:
99 throw new ChannelException("already bound");
100 }
101 }
102 }
103
104 void setConnected() {
105 if (state.get() != ST_CLOSED) {
106 state.set(ST_CONNECTED);
107 }
108 }
109
110 @Override
111 protected boolean setClosed() {
112 return super.setClosed();
113 }
114
115 public LocalAddress getLocalAddress() {
116 return localAddress;
117 }
118
119 public LocalAddress getRemoteAddress() {
120 return remoteAddress;
121 }
122
123 void closeNow(ChannelFuture future) {
124 LocalAddress localAddress = this.localAddress;
125 try {
126
127 if (!setClosed()) {
128 return;
129 }
130
131 DefaultLocalChannel pairedChannel = this.pairedChannel;
132 if (pairedChannel != null) {
133 this.pairedChannel = null;
134 fireChannelDisconnected(this);
135 fireChannelUnbound(this);
136 }
137 fireChannelClosed(this);
138
139
140 if (pairedChannel == null || !pairedChannel.setClosed()) {
141 return;
142 }
143
144 DefaultLocalChannel me = pairedChannel.pairedChannel;
145 if (me != null) {
146 pairedChannel.pairedChannel = null;
147 fireChannelDisconnected(pairedChannel);
148 fireChannelUnbound(pairedChannel);
149 }
150 fireChannelClosed(pairedChannel);
151 } finally {
152 future.setSuccess();
153 if (localAddress != null && getParent() == null) {
154 LocalChannelRegistry.unregister(localAddress);
155 }
156 }
157 }
158
159 void flushWriteBuffer() {
160 DefaultLocalChannel pairedChannel = this.pairedChannel;
161 if (pairedChannel != null) {
162 if (pairedChannel.isConnected()) {
163
164
165 if (!delivering.get()) {
166 delivering.set(true);
167 try {
168 for (;;) {
169 MessageEvent e = writeBuffer.poll();
170 if (e == null) {
171 break;
172 }
173
174 e.getFuture().setSuccess();
175 fireMessageReceived(pairedChannel, e.getMessage());
176 fireWriteComplete(this, 1);
177 }
178 } finally {
179 delivering.set(false);
180 }
181 }
182 } else {
183
184
185 }
186 } else {
187
188 Exception cause;
189 if (isOpen()) {
190 cause = new NotYetConnectedException();
191 } else {
192 cause = new ClosedChannelException();
193 }
194
195 for (;;) {
196 MessageEvent e = writeBuffer.poll();
197 if (e == null) {
198 break;
199 }
200
201 e.getFuture().setFailure(cause);
202 fireExceptionCaught(this, cause);
203 }
204 }
205 }
206 }