View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.channel.socket.nio;
17  
18  import org.jboss.netty.channel.Channel;
19  import org.jboss.netty.channel.ChannelFuture;
20  import org.jboss.netty.channel.ChannelPipeline;
21  import org.jboss.netty.channel.ChannelSink;
22  import org.jboss.netty.util.ThreadNameDeterminer;
23  import org.jboss.netty.util.ThreadRenamingRunnable;
24  
25  import java.io.IOException;
26  import java.net.SocketAddress;
27  import java.net.SocketTimeoutException;
28  import java.nio.channels.CancelledKeyException;
29  import java.nio.channels.ClosedChannelException;
30  import java.nio.channels.SelectionKey;
31  import java.nio.channels.Selector;
32  import java.nio.channels.SocketChannel;
33  import java.util.Iterator;
34  import java.util.Set;
35  import java.util.concurrent.Executor;
36  
37  import static org.jboss.netty.channel.Channels.*;
38  
39  /**
40   * Boss implementation which handles accepting of new connections
41   */
42  public final class NioServerBoss extends AbstractNioSelector implements Boss {
43  
44      NioServerBoss(Executor bossExecutor) {
45          super(bossExecutor);
46      }
47  
48      NioServerBoss(Executor bossExecutor, ThreadNameDeterminer determiner) {
49          super(bossExecutor, determiner);
50      }
51  
52      void bind(final NioServerSocketChannel channel, final ChannelFuture future,
53                final SocketAddress localAddress) {
54          registerTask(new RegisterTask(channel, future, localAddress));
55      }
56  
57      @Override
58      protected void close(SelectionKey k) {
59          NioServerSocketChannel ch = (NioServerSocketChannel) k.attachment();
60          close(ch, succeededFuture(ch));
61      }
62  
63      void close(NioServerSocketChannel channel, ChannelFuture future) {
64          boolean bound = channel.isBound();
65  
66          try {
67              channel.socket.close();
68              increaseCancelledKeys();
69  
70              if (channel.setClosed()) {
71                  future.setSuccess();
72  
73                  if (bound) {
74                      fireChannelUnbound(channel);
75                  }
76                  fireChannelClosed(channel);
77              } else {
78                  future.setSuccess();
79              }
80          } catch (Throwable t) {
81              future.setFailure(t);
82              fireExceptionCaught(channel, t);
83          }
84      }
85  
86      @Override
87      protected void process(Selector selector) {
88          Set<SelectionKey> selectedKeys = selector.selectedKeys();
89          if (selectedKeys.isEmpty()) {
90              return;
91          }
92          for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {
93              SelectionKey k = i.next();
94              i.remove();
95              NioServerSocketChannel channel = (NioServerSocketChannel) k.attachment();
96  
97              try {
98                  // accept connections in a for loop until no new connection is ready
99                  for (;;) {
100                     SocketChannel acceptedSocket = channel.socket.accept();
101                     if (acceptedSocket == null) {
102                         break;
103                     }
104                     registerAcceptedChannel(channel, acceptedSocket, thread);
105                 }
106             } catch (CancelledKeyException e) {
107                 // Raised by accept() when the server socket was closed.
108                 k.cancel();
109                 channel.close();
110             } catch (SocketTimeoutException e) {
111                 // Thrown every second to get ClosedChannelException
112                 // raised.
113             } catch (ClosedChannelException e) {
114                 // Closed as requested.
115             } catch (Throwable t) {
116                 if (logger.isWarnEnabled()) {
117                     logger.warn(
118                             "Failed to accept a connection.", t);
119                 }
120 
121                 try {
122                     Thread.sleep(1000);
123                 } catch (InterruptedException e1) {
124                     // Ignore
125                 }
126             }
127         }
128     }
129 
130     private static void registerAcceptedChannel(NioServerSocketChannel parent, SocketChannel acceptedSocket,
131                                          Thread currentThread) {
132         try {
133             ChannelSink sink = parent.getPipeline().getSink();
134             ChannelPipeline pipeline =
135                     parent.getConfig().getPipelineFactory().getPipeline();
136             NioWorker worker = parent.workerPool.nextWorker();
137             worker.register(new NioAcceptedSocketChannel(
138                     parent.getFactory(), pipeline, parent, sink
139                     , acceptedSocket,
140                     worker, currentThread), null);
141         } catch (Exception e) {
142             if (logger.isWarnEnabled()) {
143                 logger.warn(
144                         "Failed to initialize an accepted socket.", e);
145             }
146 
147             try {
148                 acceptedSocket.close();
149             } catch (IOException e2) {
150                 if (logger.isWarnEnabled()) {
151                     logger.warn(
152                             "Failed to close a partially accepted socket.",
153                             e2);
154                 }
155             }
156         }
157     }
158 
159     @Override
160     protected int select(Selector selector) throws IOException {
161         // Just do a blocking select without any timeout
162         // as this thread does not execute anything else.
163         return selector.select();
164     }
165 
166     @Override
167     protected ThreadRenamingRunnable newThreadRenamingRunnable(int id, ThreadNameDeterminer determiner) {
168         return new ThreadRenamingRunnable(this,
169                 "New I/O server boss #" + id, determiner);
170     }
171 
172     @Override
173     protected Runnable createRegisterTask(Channel channel, ChannelFuture future) {
174         return new RegisterTask((NioServerSocketChannel) channel, future, null);
175     }
176 
177     private final class RegisterTask implements Runnable {
178         private final NioServerSocketChannel channel;
179         private final ChannelFuture future;
180         private final SocketAddress localAddress;
181 
182         public RegisterTask(final NioServerSocketChannel channel, final ChannelFuture future,
183                             final SocketAddress localAddress) {
184             this.channel = channel;
185             this.future = future;
186             this.localAddress = localAddress;
187         }
188 
189         public void run() {
190             boolean bound = false;
191             boolean registered = false;
192             try {
193                 channel.socket.socket().bind(localAddress, channel.getConfig().getBacklog());
194                 bound = true;
195 
196                 future.setSuccess();
197                 fireChannelBound(channel, channel.getLocalAddress());
198                 channel.socket.register(selector, SelectionKey.OP_ACCEPT, channel);
199 
200                 registered = true;
201             } catch (Throwable t) {
202                 future.setFailure(t);
203                 fireExceptionCaught(channel, t);
204             } finally {
205                 if (!registered && bound) {
206                     close(channel, future);
207                 }
208             }
209         }
210     }
211 }