1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.channel.socket.oio;
17
18 import static org.jboss.netty.channel.Channels.*;
19
20 import java.io.IOException;
21 import java.net.Socket;
22 import java.net.SocketAddress;
23 import java.net.SocketTimeoutException;
24 import java.util.concurrent.Executor;
25
26 import org.jboss.netty.channel.Channel;
27 import org.jboss.netty.channel.ChannelEvent;
28 import org.jboss.netty.channel.ChannelFuture;
29 import org.jboss.netty.channel.ChannelPipeline;
30 import org.jboss.netty.channel.ChannelState;
31 import org.jboss.netty.channel.ChannelStateEvent;
32 import org.jboss.netty.channel.MessageEvent;
33 import org.jboss.netty.logging.InternalLogger;
34 import org.jboss.netty.logging.InternalLoggerFactory;
35 import org.jboss.netty.util.ThreadNameDeterminer;
36 import org.jboss.netty.util.ThreadRenamingRunnable;
37 import org.jboss.netty.util.internal.DeadLockProofWorker;
38
39 class OioServerSocketPipelineSink extends AbstractOioChannelSink {
40
41 static final InternalLogger logger =
42 InternalLoggerFactory.getInstance(OioServerSocketPipelineSink.class);
43
44 final Executor workerExecutor;
45 private final ThreadNameDeterminer determiner;
46
47 OioServerSocketPipelineSink(Executor workerExecutor, ThreadNameDeterminer determiner) {
48 this.workerExecutor = workerExecutor;
49 this.determiner = determiner;
50 }
51
52 public void eventSunk(
53 ChannelPipeline pipeline, ChannelEvent e) throws Exception {
54 Channel channel = e.getChannel();
55 if (channel instanceof OioServerSocketChannel) {
56 handleServerSocket(e);
57 } else if (channel instanceof OioAcceptedSocketChannel) {
58 handleAcceptedSocket(e);
59 }
60 }
61
62 private void handleServerSocket(ChannelEvent e) {
63 if (!(e instanceof ChannelStateEvent)) {
64 return;
65 }
66
67 ChannelStateEvent event = (ChannelStateEvent) e;
68 OioServerSocketChannel channel =
69 (OioServerSocketChannel) event.getChannel();
70 ChannelFuture future = event.getFuture();
71 ChannelState state = event.getState();
72 Object value = event.getValue();
73
74 switch (state) {
75 case OPEN:
76 if (Boolean.FALSE.equals(value)) {
77 close(channel, future);
78 }
79 break;
80 case BOUND:
81 if (value != null) {
82 bind(channel, future, (SocketAddress) value);
83 } else {
84 close(channel, future);
85 }
86 break;
87 }
88 }
89
90 private static void handleAcceptedSocket(ChannelEvent e) {
91 if (e instanceof ChannelStateEvent) {
92 ChannelStateEvent event = (ChannelStateEvent) e;
93 OioAcceptedSocketChannel channel =
94 (OioAcceptedSocketChannel) event.getChannel();
95 ChannelFuture future = event.getFuture();
96 ChannelState state = event.getState();
97 Object value = event.getValue();
98
99 switch (state) {
100 case OPEN:
101 if (Boolean.FALSE.equals(value)) {
102 AbstractOioWorker.close(channel, future);
103 }
104 break;
105 case BOUND:
106 case CONNECTED:
107 if (value == null) {
108 AbstractOioWorker.close(channel, future);
109 }
110 break;
111 case INTEREST_OPS:
112 AbstractOioWorker.setInterestOps(channel, future, ((Integer) value).intValue());
113 break;
114 }
115 } else if (e instanceof MessageEvent) {
116 MessageEvent event = (MessageEvent) e;
117 OioSocketChannel channel = (OioSocketChannel) event.getChannel();
118 ChannelFuture future = event.getFuture();
119 Object message = event.getMessage();
120 OioWorker.write(channel, future, message);
121 }
122 }
123
124 private void bind(
125 OioServerSocketChannel channel, ChannelFuture future,
126 SocketAddress localAddress) {
127
128 boolean bound = false;
129 boolean bossStarted = false;
130 try {
131 channel.socket.bind(localAddress, channel.getConfig().getBacklog());
132 bound = true;
133
134 future.setSuccess();
135 localAddress = channel.getLocalAddress();
136 fireChannelBound(channel, localAddress);
137
138 Executor bossExecutor =
139 ((OioServerSocketChannelFactory) channel.getFactory()).bossExecutor;
140 DeadLockProofWorker.start(
141 bossExecutor,
142 new ThreadRenamingRunnable(
143 new Boss(channel),
144 "Old I/O server boss (" + channel + ')',
145 determiner));
146 bossStarted = true;
147 } catch (Throwable t) {
148 future.setFailure(t);
149 fireExceptionCaught(channel, t);
150 } finally {
151 if (!bossStarted && bound) {
152 close(channel, future);
153 }
154 }
155 }
156
157 private static void close(OioServerSocketChannel channel, ChannelFuture future) {
158 boolean bound = channel.isBound();
159 try {
160 channel.socket.close();
161
162
163
164
165 channel.shutdownLock.lock();
166 try {
167 if (channel.setClosed()) {
168 future.setSuccess();
169 if (bound) {
170 fireChannelUnbound(channel);
171 }
172 fireChannelClosed(channel);
173 } else {
174 future.setSuccess();
175 }
176 } finally {
177 channel.shutdownLock.unlock();
178 }
179 } catch (Throwable t) {
180 future.setFailure(t);
181 fireExceptionCaught(channel, t);
182 }
183 }
184
185 private final class Boss implements Runnable {
186 private final OioServerSocketChannel channel;
187
188 Boss(OioServerSocketChannel channel) {
189 this.channel = channel;
190 }
191
192 public void run() {
193 channel.shutdownLock.lock();
194 try {
195 while (channel.isBound()) {
196 try {
197 Socket acceptedSocket = channel.socket.accept();
198 try {
199 ChannelPipeline pipeline =
200 channel.getConfig().getPipelineFactory().getPipeline();
201 final OioAcceptedSocketChannel acceptedChannel =
202 new OioAcceptedSocketChannel(
203 channel,
204 channel.getFactory(),
205 pipeline,
206 OioServerSocketPipelineSink.this,
207 acceptedSocket);
208 DeadLockProofWorker.start(
209 workerExecutor,
210 new ThreadRenamingRunnable(
211 new OioWorker(acceptedChannel),
212 "Old I/O server worker (parentId: " +
213 channel.getId() + ", " + channel + ')',
214 determiner));
215 } catch (Exception e) {
216 if (logger.isWarnEnabled()) {
217 logger.warn(
218 "Failed to initialize an accepted socket.", e);
219 }
220
221 try {
222 acceptedSocket.close();
223 } catch (IOException e2) {
224 if (logger.isWarnEnabled()) {
225 logger.warn(
226 "Failed to close a partially accepted socket.",
227 e2);
228 }
229 }
230 }
231 } catch (SocketTimeoutException e) {
232
233 } catch (Throwable e) {
234
235
236 if (!channel.socket.isBound() || channel.socket.isClosed()) {
237 break;
238 }
239 if (logger.isWarnEnabled()) {
240 logger.warn(
241 "Failed to accept a connection.", e);
242 }
243 try {
244 Thread.sleep(1000);
245 } catch (InterruptedException e1) {
246
247 }
248 }
249 }
250 } finally {
251 channel.shutdownLock.unlock();
252 }
253 }
254 }
255 }