1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.channel.socket.oio;
17
18 import static org.jboss.netty.channel.Channels.*;
19
20 import java.io.IOException;
21 import java.net.Socket;
22 import java.net.SocketAddress;
23 import java.net.SocketTimeoutException;
24 import java.util.concurrent.Executor;
25
26 import org.jboss.netty.channel.Channel;
27 import org.jboss.netty.channel.ChannelEvent;
28 import org.jboss.netty.channel.ChannelFuture;
29 import org.jboss.netty.channel.ChannelPipeline;
30 import org.jboss.netty.channel.ChannelState;
31 import org.jboss.netty.channel.ChannelStateEvent;
32 import org.jboss.netty.channel.MessageEvent;
33 import org.jboss.netty.logging.InternalLogger;
34 import org.jboss.netty.logging.InternalLoggerFactory;
35 import org.jboss.netty.util.ThreadRenamingRunnable;
36 import org.jboss.netty.util.internal.DeadLockProofWorker;
37
38 class OioServerSocketPipelineSink extends AbstractOioChannelSink {
39
40 static final InternalLogger logger =
41 InternalLoggerFactory.getInstance(OioServerSocketPipelineSink.class);
42
43 final Executor workerExecutor;
44
45 OioServerSocketPipelineSink(Executor workerExecutor) {
46 this.workerExecutor = workerExecutor;
47 }
48
49 public void eventSunk(
50 ChannelPipeline pipeline, ChannelEvent e) throws Exception {
51 Channel channel = e.getChannel();
52 if (channel instanceof OioServerSocketChannel) {
53 handleServerSocket(e);
54 } else if (channel instanceof OioAcceptedSocketChannel) {
55 handleAcceptedSocket(e);
56 }
57 }
58
59 private void handleServerSocket(ChannelEvent e) {
60 if (!(e instanceof ChannelStateEvent)) {
61 return;
62 }
63
64 ChannelStateEvent event = (ChannelStateEvent) e;
65 OioServerSocketChannel channel =
66 (OioServerSocketChannel) event.getChannel();
67 ChannelFuture future = event.getFuture();
68 ChannelState state = event.getState();
69 Object value = event.getValue();
70
71 switch (state) {
72 case OPEN:
73 if (Boolean.FALSE.equals(value)) {
74 close(channel, future);
75 }
76 break;
77 case BOUND:
78 if (value != null) {
79 bind(channel, future, (SocketAddress) value);
80 } else {
81 close(channel, future);
82 }
83 break;
84 }
85 }
86
87 private static void handleAcceptedSocket(ChannelEvent e) {
88 if (e instanceof ChannelStateEvent) {
89 ChannelStateEvent event = (ChannelStateEvent) e;
90 OioAcceptedSocketChannel channel =
91 (OioAcceptedSocketChannel) event.getChannel();
92 ChannelFuture future = event.getFuture();
93 ChannelState state = event.getState();
94 Object value = event.getValue();
95
96 switch (state) {
97 case OPEN:
98 if (Boolean.FALSE.equals(value)) {
99 AbstractOioWorker.close(channel, future);
100 }
101 break;
102 case BOUND:
103 case CONNECTED:
104 if (value == null) {
105 AbstractOioWorker.close(channel, future);
106 }
107 break;
108 case INTEREST_OPS:
109 AbstractOioWorker.setInterestOps(channel, future, ((Integer) value).intValue());
110 break;
111 }
112 } else if (e instanceof MessageEvent) {
113 MessageEvent event = (MessageEvent) e;
114 OioSocketChannel channel = (OioSocketChannel) event.getChannel();
115 ChannelFuture future = event.getFuture();
116 Object message = event.getMessage();
117 OioWorker.write(channel, future, message);
118 }
119 }
120
121 private void bind(
122 OioServerSocketChannel channel, ChannelFuture future,
123 SocketAddress localAddress) {
124
125 boolean bound = false;
126 boolean bossStarted = false;
127 try {
128 channel.socket.bind(localAddress, channel.getConfig().getBacklog());
129 bound = true;
130
131 future.setSuccess();
132 localAddress = channel.getLocalAddress();
133 fireChannelBound(channel, localAddress);
134
135 Executor bossExecutor =
136 ((OioServerSocketChannelFactory) channel.getFactory()).bossExecutor;
137 DeadLockProofWorker.start(
138 bossExecutor,
139 new ThreadRenamingRunnable(
140 new Boss(channel),
141 "Old I/O server boss (" + channel + ')'));
142 bossStarted = true;
143 } catch (Throwable t) {
144 future.setFailure(t);
145 fireExceptionCaught(channel, t);
146 } finally {
147 if (!bossStarted && bound) {
148 close(channel, future);
149 }
150 }
151 }
152
153 private static void close(OioServerSocketChannel channel, ChannelFuture future) {
154 boolean bound = channel.isBound();
155 try {
156 channel.socket.close();
157
158
159
160
161 channel.shutdownLock.lock();
162 try {
163 if (channel.setClosed()) {
164 future.setSuccess();
165 if (bound) {
166 fireChannelUnbound(channel);
167 }
168 fireChannelClosed(channel);
169 } else {
170 future.setSuccess();
171 }
172 } finally {
173 channel.shutdownLock.unlock();
174 }
175 } catch (Throwable t) {
176 future.setFailure(t);
177 fireExceptionCaught(channel, t);
178 }
179 }
180
181 private final class Boss implements Runnable {
182 private final OioServerSocketChannel channel;
183
184 Boss(OioServerSocketChannel channel) {
185 this.channel = channel;
186 }
187
188 public void run() {
189 channel.shutdownLock.lock();
190 try {
191 while (channel.isBound()) {
192 try {
193 Socket acceptedSocket = channel.socket.accept();
194 try {
195 ChannelPipeline pipeline =
196 channel.getConfig().getPipelineFactory().getPipeline();
197 final OioAcceptedSocketChannel acceptedChannel =
198 new OioAcceptedSocketChannel(
199 channel,
200 channel.getFactory(),
201 pipeline,
202 OioServerSocketPipelineSink.this,
203 acceptedSocket);
204 DeadLockProofWorker.start(
205 workerExecutor,
206 new ThreadRenamingRunnable(
207 new OioWorker(acceptedChannel),
208 "Old I/O server worker (parentId: " +
209 channel.getId() + ", " + channel + ')'));
210 } catch (Exception e) {
211 if (logger.isWarnEnabled()) {
212 logger.warn(
213 "Failed to initialize an accepted socket.", e);
214 }
215
216 try {
217 acceptedSocket.close();
218 } catch (IOException e2) {
219 if (logger.isWarnEnabled()) {
220 logger.warn(
221 "Failed to close a partially accepted socket.",
222 e2);
223 }
224
225 }
226 }
227 } catch (SocketTimeoutException e) {
228
229 } catch (Throwable e) {
230
231
232 if (!channel.socket.isBound() || channel.socket.isClosed()) {
233 break;
234 }
235 if (logger.isWarnEnabled()) {
236 logger.warn(
237 "Failed to accept a connection.", e);
238 }
239 try {
240 Thread.sleep(1000);
241 } catch (InterruptedException e1) {
242
243 }
244 }
245 }
246 } finally {
247 channel.shutdownLock.unlock();
248 }
249 }
250 }
251 }