1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.channel.socket.nio;
17
18 import static org.jboss.netty.channel.Channels.*;
19
20 import java.io.IOException;
21 import java.net.SocketAddress;
22 import java.net.SocketTimeoutException;
23 import java.nio.channels.CancelledKeyException;
24 import java.nio.channels.ClosedChannelException;
25 import java.nio.channels.ClosedSelectorException;
26 import java.nio.channels.SelectionKey;
27 import java.nio.channels.Selector;
28 import java.nio.channels.SocketChannel;
29 import java.util.concurrent.Executor;
30 import java.util.concurrent.atomic.AtomicInteger;
31
32 import org.jboss.netty.channel.Channel;
33 import org.jboss.netty.channel.ChannelEvent;
34 import org.jboss.netty.channel.ChannelFuture;
35 import org.jboss.netty.channel.ChannelPipeline;
36 import org.jboss.netty.channel.ChannelState;
37 import org.jboss.netty.channel.ChannelStateEvent;
38 import org.jboss.netty.channel.MessageEvent;
39 import org.jboss.netty.logging.InternalLogger;
40 import org.jboss.netty.logging.InternalLoggerFactory;
41 import org.jboss.netty.util.ThreadRenamingRunnable;
42 import org.jboss.netty.util.internal.DeadLockProofWorker;
43
44 class NioServerSocketPipelineSink extends AbstractNioChannelSink {
45
46 private static final AtomicInteger nextId = new AtomicInteger();
47
48 static final InternalLogger logger =
49 InternalLoggerFactory.getInstance(NioServerSocketPipelineSink.class);
50
51 final int id = nextId.incrementAndGet();
52
53 private final WorkerPool<NioWorker> workerPool;
54
55 NioServerSocketPipelineSink(WorkerPool<NioWorker> workerPool) {
56 this.workerPool = workerPool;
57 }
58
59
60 public void eventSunk(
61 ChannelPipeline pipeline, ChannelEvent e) throws Exception {
62 Channel channel = e.getChannel();
63 if (channel instanceof NioServerSocketChannel) {
64 handleServerSocket(e);
65 } else if (channel instanceof NioSocketChannel) {
66 handleAcceptedSocket(e);
67 }
68 }
69
70 private void handleServerSocket(ChannelEvent e) {
71 if (!(e instanceof ChannelStateEvent)) {
72 return;
73 }
74
75 ChannelStateEvent event = (ChannelStateEvent) e;
76 NioServerSocketChannel channel =
77 (NioServerSocketChannel) event.getChannel();
78 ChannelFuture future = event.getFuture();
79 ChannelState state = event.getState();
80 Object value = event.getValue();
81
82 switch (state) {
83 case OPEN:
84 if (Boolean.FALSE.equals(value)) {
85 close(channel, future);
86 }
87 break;
88 case BOUND:
89 if (value != null) {
90 bind(channel, future, (SocketAddress) value);
91 } else {
92 close(channel, future);
93 }
94 break;
95 default:
96 break;
97 }
98 }
99
100 private static void handleAcceptedSocket(ChannelEvent e) {
101 if (e instanceof ChannelStateEvent) {
102 ChannelStateEvent event = (ChannelStateEvent) e;
103 NioSocketChannel channel = (NioSocketChannel) event.getChannel();
104 ChannelFuture future = event.getFuture();
105 ChannelState state = event.getState();
106 Object value = event.getValue();
107
108 switch (state) {
109 case OPEN:
110 if (Boolean.FALSE.equals(value)) {
111 channel.worker.close(channel, future);
112 }
113 break;
114 case BOUND:
115 case CONNECTED:
116 if (value == null) {
117 channel.worker.close(channel, future);
118 }
119 break;
120 case INTEREST_OPS:
121 channel.worker.setInterestOps(channel, future, ((Integer) value).intValue());
122 break;
123 }
124 } else if (e instanceof MessageEvent) {
125 MessageEvent event = (MessageEvent) e;
126 NioSocketChannel channel = (NioSocketChannel) event.getChannel();
127 boolean offered = channel.writeBufferQueue.offer(event);
128 assert offered;
129 channel.worker.writeFromUserCode(channel);
130 }
131 }
132
133 private void bind(
134 NioServerSocketChannel channel, ChannelFuture future,
135 SocketAddress localAddress) {
136
137 boolean bound = false;
138 boolean bossStarted = false;
139 try {
140 channel.socket.socket().bind(localAddress, channel.getConfig().getBacklog());
141 bound = true;
142
143 future.setSuccess();
144 fireChannelBound(channel, channel.getLocalAddress());
145
146 Executor bossExecutor =
147 ((NioServerSocketChannelFactory) channel.getFactory()).bossExecutor;
148 DeadLockProofWorker.start(bossExecutor,
149 new ThreadRenamingRunnable(new Boss(channel),
150 "New I/O server boss #" + id + " (" + channel + ')'));
151 bossStarted = true;
152 } catch (Throwable t) {
153 future.setFailure(t);
154 fireExceptionCaught(channel, t);
155 } finally {
156 if (!bossStarted && bound) {
157 close(channel, future);
158 }
159 }
160 }
161
162 private static void close(NioServerSocketChannel channel, ChannelFuture future) {
163 boolean bound = channel.isBound();
164 try {
165 if (channel.socket.isOpen()) {
166 channel.socket.close();
167 Selector selector = channel.selector;
168 if (selector != null) {
169 selector.wakeup();
170 }
171 }
172
173
174
175
176 channel.shutdownLock.lock();
177 try {
178 if (channel.setClosed()) {
179 future.setSuccess();
180 if (bound) {
181 fireChannelUnbound(channel);
182 }
183 fireChannelClosed(channel);
184 } else {
185 future.setSuccess();
186 }
187 } finally {
188 channel.shutdownLock.unlock();
189 }
190 } catch (Throwable t) {
191 future.setFailure(t);
192 fireExceptionCaught(channel, t);
193 }
194 }
195
196 NioWorker nextWorker() {
197 return workerPool.nextWorker();
198 }
199
200 private final class Boss implements Runnable {
201 private final Selector selector;
202 private final NioServerSocketChannel channel;
203
204 Boss(NioServerSocketChannel channel) throws IOException {
205 this.channel = channel;
206
207 selector = Selector.open();
208
209 boolean registered = false;
210 try {
211 channel.socket.register(selector, SelectionKey.OP_ACCEPT);
212 registered = true;
213 } finally {
214 if (!registered) {
215 closeSelector();
216 }
217 }
218
219 channel.selector = selector;
220 }
221
222 public void run() {
223 final Thread currentThread = Thread.currentThread();
224
225 channel.shutdownLock.lock();
226 try {
227 for (;;) {
228 try {
229
230
231 selector.select();
232
233
234 selector.selectedKeys().clear();
235
236
237 for (;;) {
238 SocketChannel acceptedSocket = channel.socket.accept();
239 if (acceptedSocket == null) {
240 break;
241 }
242 registerAcceptedChannel(acceptedSocket, currentThread);
243
244 }
245
246 } catch (SocketTimeoutException e) {
247
248
249 } catch (CancelledKeyException e) {
250
251 } catch (ClosedSelectorException e) {
252
253 } catch (ClosedChannelException e) {
254
255 break;
256 } catch (Throwable e) {
257 if (logger.isWarnEnabled()) {
258 logger.warn(
259 "Failed to accept a connection.", e);
260 }
261
262 try {
263 Thread.sleep(1000);
264 } catch (InterruptedException e1) {
265
266 }
267 }
268 }
269 } finally {
270 channel.shutdownLock.unlock();
271 closeSelector();
272 }
273 }
274
275 private void registerAcceptedChannel(SocketChannel acceptedSocket, Thread currentThread) {
276 try {
277 ChannelPipeline pipeline =
278 channel.getConfig().getPipelineFactory().getPipeline();
279 NioWorker worker = nextWorker();
280 worker.register(new NioAcceptedSocketChannel(
281 channel.getFactory(), pipeline, channel,
282 NioServerSocketPipelineSink.this, acceptedSocket,
283 worker, currentThread), null);
284 } catch (Exception e) {
285 if (logger.isWarnEnabled()) {
286 logger.warn(
287 "Failed to initialize an accepted socket.", e);
288 }
289
290 try {
291 acceptedSocket.close();
292 } catch (IOException e2) {
293 if (logger.isWarnEnabled()) {
294 logger.warn(
295 "Failed to close a partially accepted socket.",
296 e2);
297 }
298
299 }
300 }
301 }
302
303 private void closeSelector() {
304 channel.selector = null;
305 try {
306 selector.close();
307 } catch (Exception e) {
308 if (logger.isWarnEnabled()) {
309 logger.warn("Failed to close a selector.", e);
310 }
311 }
312 }
313 }
314 }