View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.channel.socket.nio;
17  
18  import static org.jboss.netty.channel.Channels.*;
19  
20  import java.io.IOException;
21  import java.net.SocketAddress;
22  import java.net.SocketTimeoutException;
23  import java.nio.channels.CancelledKeyException;
24  import java.nio.channels.ClosedChannelException;
25  import java.nio.channels.ClosedSelectorException;
26  import java.nio.channels.SelectionKey;
27  import java.nio.channels.Selector;
28  import java.nio.channels.SocketChannel;
29  import java.util.concurrent.Executor;
30  import java.util.concurrent.atomic.AtomicInteger;
31  
32  import org.jboss.netty.channel.Channel;
33  import org.jboss.netty.channel.ChannelEvent;
34  import org.jboss.netty.channel.ChannelFuture;
35  import org.jboss.netty.channel.ChannelPipeline;
36  import org.jboss.netty.channel.ChannelState;
37  import org.jboss.netty.channel.ChannelStateEvent;
38  import org.jboss.netty.channel.MessageEvent;
39  import org.jboss.netty.logging.InternalLogger;
40  import org.jboss.netty.logging.InternalLoggerFactory;
41  import org.jboss.netty.util.ThreadRenamingRunnable;
42  import org.jboss.netty.util.internal.DeadLockProofWorker;
43  
44  class NioServerSocketPipelineSink extends AbstractNioChannelSink {
45  
46      private static final AtomicInteger nextId = new AtomicInteger();
47  
48      static final InternalLogger logger =
49          InternalLoggerFactory.getInstance(NioServerSocketPipelineSink.class);
50  
51      final int id = nextId.incrementAndGet();
52  
53      private final WorkerPool<NioWorker> workerPool;
54  
55      NioServerSocketPipelineSink(WorkerPool<NioWorker> workerPool) {
56          this.workerPool = workerPool;
57      }
58  
59  
60      public void eventSunk(
61              ChannelPipeline pipeline, ChannelEvent e) throws Exception {
62          Channel channel = e.getChannel();
63          if (channel instanceof NioServerSocketChannel) {
64              handleServerSocket(e);
65          } else if (channel instanceof NioSocketChannel) {
66              handleAcceptedSocket(e);
67          }
68      }
69  
70      private void handleServerSocket(ChannelEvent e) {
71          if (!(e instanceof ChannelStateEvent)) {
72              return;
73          }
74  
75          ChannelStateEvent event = (ChannelStateEvent) e;
76          NioServerSocketChannel channel =
77              (NioServerSocketChannel) event.getChannel();
78          ChannelFuture future = event.getFuture();
79          ChannelState state = event.getState();
80          Object value = event.getValue();
81  
82          switch (state) {
83          case OPEN:
84              if (Boolean.FALSE.equals(value)) {
85                  close(channel, future);
86              }
87              break;
88          case BOUND:
89              if (value != null) {
90                  bind(channel, future, (SocketAddress) value);
91              } else {
92                  close(channel, future);
93              }
94              break;
95          default:
96              break;
97          }
98      }
99  
100     private static void handleAcceptedSocket(ChannelEvent e) {
101         if (e instanceof ChannelStateEvent) {
102             ChannelStateEvent event = (ChannelStateEvent) e;
103             NioSocketChannel channel = (NioSocketChannel) event.getChannel();
104             ChannelFuture future = event.getFuture();
105             ChannelState state = event.getState();
106             Object value = event.getValue();
107 
108             switch (state) {
109             case OPEN:
110                 if (Boolean.FALSE.equals(value)) {
111                     channel.worker.close(channel, future);
112                 }
113                 break;
114             case BOUND:
115             case CONNECTED:
116                 if (value == null) {
117                     channel.worker.close(channel, future);
118                 }
119                 break;
120             case INTEREST_OPS:
121                 channel.worker.setInterestOps(channel, future, ((Integer) value).intValue());
122                 break;
123             }
124         } else if (e instanceof MessageEvent) {
125             MessageEvent event = (MessageEvent) e;
126             NioSocketChannel channel = (NioSocketChannel) event.getChannel();
127             boolean offered = channel.writeBufferQueue.offer(event);
128             assert offered;
129             channel.worker.writeFromUserCode(channel);
130         }
131     }
132 
133     private void bind(
134             NioServerSocketChannel channel, ChannelFuture future,
135             SocketAddress localAddress) {
136 
137         boolean bound = false;
138         boolean bossStarted = false;
139         try {
140             channel.socket.socket().bind(localAddress, channel.getConfig().getBacklog());
141             bound = true;
142 
143             future.setSuccess();
144             fireChannelBound(channel, channel.getLocalAddress());
145 
146             Executor bossExecutor =
147                 ((NioServerSocketChannelFactory) channel.getFactory()).bossExecutor;
148             DeadLockProofWorker.start(bossExecutor,
149                     new ThreadRenamingRunnable(new Boss(channel),
150                             "New I/O server boss #" + id + " (" + channel + ')'));
151             bossStarted = true;
152         } catch (Throwable t) {
153             future.setFailure(t);
154             fireExceptionCaught(channel, t);
155         } finally {
156             if (!bossStarted && bound) {
157                 close(channel, future);
158             }
159         }
160     }
161 
162     private static void close(NioServerSocketChannel channel, ChannelFuture future) {
163         boolean bound = channel.isBound();
164         try {
165             if (channel.socket.isOpen()) {
166                 channel.socket.close();
167                 Selector selector = channel.selector;
168                 if (selector != null) {
169                     selector.wakeup();
170                 }
171             }
172 
173             // Make sure the boss thread is not running so that that the future
174             // is notified after a new connection cannot be accepted anymore.
175             // See NETTY-256 for more information.
176             channel.shutdownLock.lock();
177             try {
178                 if (channel.setClosed()) {
179                     future.setSuccess();
180                     if (bound) {
181                         fireChannelUnbound(channel);
182                     }
183                     fireChannelClosed(channel);
184                 } else {
185                     future.setSuccess();
186                 }
187             } finally {
188                 channel.shutdownLock.unlock();
189             }
190         } catch (Throwable t) {
191             future.setFailure(t);
192             fireExceptionCaught(channel, t);
193         }
194     }
195 
196     NioWorker nextWorker() {
197         return workerPool.nextWorker();
198     }
199 
200     private final class Boss implements Runnable {
201         private final Selector selector;
202         private final NioServerSocketChannel channel;
203 
204         Boss(NioServerSocketChannel channel) throws IOException {
205             this.channel = channel;
206 
207             selector = Selector.open();
208 
209             boolean registered = false;
210             try {
211                 channel.socket.register(selector, SelectionKey.OP_ACCEPT);
212                 registered = true;
213             } finally {
214                 if (!registered) {
215                     closeSelector();
216                 }
217             }
218 
219             channel.selector = selector;
220         }
221 
222         public void run() {
223             final Thread currentThread = Thread.currentThread();
224 
225             channel.shutdownLock.lock();
226             try {
227                 for (;;) {
228                     try {
229                         // Just do a blocking select without any timeout
230                         // as this thread does not execute anything else.
231                         selector.select();
232                         // There was something selected if we reach this point, so clear
233                         // the selected keys
234                         selector.selectedKeys().clear();
235 
236                         // accept connections in a for loop until no new connection is ready
237                         for (;;) {
238                             SocketChannel acceptedSocket = channel.socket.accept();
239                             if (acceptedSocket == null) {
240                                 break;
241                             }
242                             registerAcceptedChannel(acceptedSocket, currentThread);
243 
244                         }
245 
246                     } catch (SocketTimeoutException e) {
247                         // Thrown every second to get ClosedChannelException
248                         // raised.
249                     } catch (CancelledKeyException e) {
250                         // Raised by accept() when the server socket was closed.
251                     } catch (ClosedSelectorException e) {
252                         // Raised by accept() when the server socket was closed.
253                     } catch (ClosedChannelException e) {
254                         // Closed as requested.
255                         break;
256                     } catch (Throwable e) {
257                         if (logger.isWarnEnabled()) {
258                             logger.warn(
259                                     "Failed to accept a connection.", e);
260                         }
261 
262                         try {
263                             Thread.sleep(1000);
264                         } catch (InterruptedException e1) {
265                             // Ignore
266                         }
267                     }
268                 }
269             } finally {
270                 channel.shutdownLock.unlock();
271                 closeSelector();
272             }
273         }
274 
275         private void registerAcceptedChannel(SocketChannel acceptedSocket, Thread currentThread) {
276             try {
277                 ChannelPipeline pipeline =
278                     channel.getConfig().getPipelineFactory().getPipeline();
279                 NioWorker worker = nextWorker();
280                 worker.register(new NioAcceptedSocketChannel(
281                         channel.getFactory(), pipeline, channel,
282                         NioServerSocketPipelineSink.this, acceptedSocket,
283                         worker, currentThread), null);
284             } catch (Exception e) {
285                 if (logger.isWarnEnabled()) {
286                     logger.warn(
287                             "Failed to initialize an accepted socket.", e);
288                 }
289 
290                 try {
291                     acceptedSocket.close();
292                 } catch (IOException e2) {
293                     if (logger.isWarnEnabled()) {
294                         logger.warn(
295                                 "Failed to close a partially accepted socket.",
296                                 e2);
297                     }
298 
299                 }
300             }
301         }
302 
303         private void closeSelector() {
304             channel.selector = null;
305             try {
306                 selector.close();
307             } catch (Exception e) {
308                 if (logger.isWarnEnabled()) {
309                     logger.warn("Failed to close a selector.", e);
310                 }
311             }
312         }
313     }
314 }