1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.channel.socket.nio;
17
18 import org.jboss.netty.channel.Channel;
19 import org.jboss.netty.channel.ChannelFuture;
20 import org.jboss.netty.channel.ChannelPipeline;
21 import org.jboss.netty.channel.ChannelSink;
22 import org.jboss.netty.util.ThreadNameDeterminer;
23 import org.jboss.netty.util.ThreadRenamingRunnable;
24
25 import java.io.IOException;
26 import java.net.SocketAddress;
27 import java.net.SocketTimeoutException;
28 import java.nio.channels.CancelledKeyException;
29 import java.nio.channels.ClosedChannelException;
30 import java.nio.channels.SelectionKey;
31 import java.nio.channels.Selector;
32 import java.nio.channels.SocketChannel;
33 import java.util.Iterator;
34 import java.util.Set;
35 import java.util.concurrent.Executor;
36
37 import static org.jboss.netty.channel.Channels.*;
38
39
40
41
42 public final class NioServerBoss extends AbstractNioSelector implements Boss {
43
44 NioServerBoss(Executor bossExecutor) {
45 super(bossExecutor);
46 }
47
48 NioServerBoss(Executor bossExecutor, ThreadNameDeterminer determiner) {
49 super(bossExecutor, determiner);
50 }
51
52 void bind(final NioServerSocketChannel channel, final ChannelFuture future,
53 final SocketAddress localAddress) {
54 registerTask(new RegisterTask(channel, future, localAddress));
55 }
56
57 @Override
58 protected void close(SelectionKey k) {
59 NioServerSocketChannel ch = (NioServerSocketChannel) k.attachment();
60 close(ch, succeededFuture(ch));
61 }
62
63 void close(NioServerSocketChannel channel, ChannelFuture future) {
64 boolean bound = channel.isBound();
65
66 try {
67 channel.socket.close();
68 increaseCancelledKeys();
69
70 if (channel.setClosed()) {
71 future.setSuccess();
72
73 if (bound) {
74 fireChannelUnbound(channel);
75 }
76 fireChannelClosed(channel);
77 } else {
78 future.setSuccess();
79 }
80 } catch (Throwable t) {
81 future.setFailure(t);
82 fireExceptionCaught(channel, t);
83 }
84 }
85
86 @Override
87 protected void process(Selector selector) {
88 Set<SelectionKey> selectedKeys = selector.selectedKeys();
89 if (selectedKeys.isEmpty()) {
90 return;
91 }
92 for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {
93 SelectionKey k = i.next();
94 i.remove();
95 NioServerSocketChannel channel = (NioServerSocketChannel) k.attachment();
96
97 try {
98
99 for (;;) {
100 SocketChannel acceptedSocket = channel.socket.accept();
101 if (acceptedSocket == null) {
102 break;
103 }
104 registerAcceptedChannel(channel, acceptedSocket, thread);
105 }
106 } catch (CancelledKeyException e) {
107
108 k.cancel();
109 channel.close();
110 } catch (SocketTimeoutException e) {
111
112
113 } catch (ClosedChannelException e) {
114
115 } catch (Throwable t) {
116 if (logger.isWarnEnabled()) {
117 logger.warn(
118 "Failed to accept a connection.", t);
119 }
120
121 try {
122 Thread.sleep(1000);
123 } catch (InterruptedException e1) {
124
125 }
126 }
127 }
128 }
129
130 private static void registerAcceptedChannel(NioServerSocketChannel parent, SocketChannel acceptedSocket,
131 Thread currentThread) {
132 try {
133 ChannelSink sink = parent.getPipeline().getSink();
134 ChannelPipeline pipeline =
135 parent.getConfig().getPipelineFactory().getPipeline();
136 NioWorker worker = parent.workerPool.nextWorker();
137 worker.register(new NioAcceptedSocketChannel(
138 parent.getFactory(), pipeline, parent, sink
139 , acceptedSocket,
140 worker, currentThread), null);
141 } catch (Exception e) {
142 if (logger.isWarnEnabled()) {
143 logger.warn(
144 "Failed to initialize an accepted socket.", e);
145 }
146
147 try {
148 acceptedSocket.close();
149 } catch (IOException e2) {
150 if (logger.isWarnEnabled()) {
151 logger.warn(
152 "Failed to close a partially accepted socket.",
153 e2);
154 }
155 }
156 }
157 }
158
159 @Override
160 protected int select(Selector selector) throws IOException {
161
162
163 return selector.select();
164 }
165
166 @Override
167 protected ThreadRenamingRunnable newThreadRenamingRunnable(int id, ThreadNameDeterminer determiner) {
168 return new ThreadRenamingRunnable(this,
169 "New I/O server boss #" + id, determiner);
170 }
171
172 @Override
173 protected Runnable createRegisterTask(Channel channel, ChannelFuture future) {
174 return new RegisterTask((NioServerSocketChannel) channel, future, null);
175 }
176
177 private final class RegisterTask implements Runnable {
178 private final NioServerSocketChannel channel;
179 private final ChannelFuture future;
180 private final SocketAddress localAddress;
181
182 public RegisterTask(final NioServerSocketChannel channel, final ChannelFuture future,
183 final SocketAddress localAddress) {
184 this.channel = channel;
185 this.future = future;
186 this.localAddress = localAddress;
187 }
188
189 public void run() {
190 boolean bound = false;
191 boolean registered = false;
192 try {
193 channel.socket.socket().bind(localAddress, channel.getConfig().getBacklog());
194 bound = true;
195
196 future.setSuccess();
197 fireChannelBound(channel, channel.getLocalAddress());
198 channel.socket.register(selector, SelectionKey.OP_ACCEPT, channel);
199
200 registered = true;
201 } catch (Throwable t) {
202 future.setFailure(t);
203 fireExceptionCaught(channel, t);
204 } finally {
205 if (!registered && bound) {
206 close(channel, future);
207 }
208 }
209 }
210 }
211 }