View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.channel.socket.oio;
17  
18  import static org.jboss.netty.channel.Channels.*;
19  
20  import java.io.IOException;
21  import java.net.Socket;
22  import java.net.SocketAddress;
23  import java.net.SocketTimeoutException;
24  import java.util.concurrent.Executor;
25  
26  import org.jboss.netty.channel.Channel;
27  import org.jboss.netty.channel.ChannelEvent;
28  import org.jboss.netty.channel.ChannelFuture;
29  import org.jboss.netty.channel.ChannelPipeline;
30  import org.jboss.netty.channel.ChannelState;
31  import org.jboss.netty.channel.ChannelStateEvent;
32  import org.jboss.netty.channel.MessageEvent;
33  import org.jboss.netty.logging.InternalLogger;
34  import org.jboss.netty.logging.InternalLoggerFactory;
35  import org.jboss.netty.util.ThreadRenamingRunnable;
36  import org.jboss.netty.util.internal.DeadLockProofWorker;
37  
38  class OioServerSocketPipelineSink extends AbstractOioChannelSink {
39  
40      static final InternalLogger logger =
41          InternalLoggerFactory.getInstance(OioServerSocketPipelineSink.class);
42  
43      final Executor workerExecutor;
44  
45      OioServerSocketPipelineSink(Executor workerExecutor) {
46          this.workerExecutor = workerExecutor;
47      }
48  
49      public void eventSunk(
50              ChannelPipeline pipeline, ChannelEvent e) throws Exception {
51          Channel channel = e.getChannel();
52          if (channel instanceof OioServerSocketChannel) {
53              handleServerSocket(e);
54          } else if (channel instanceof OioAcceptedSocketChannel) {
55              handleAcceptedSocket(e);
56          }
57      }
58  
59      private void handleServerSocket(ChannelEvent e) {
60          if (!(e instanceof ChannelStateEvent)) {
61              return;
62          }
63  
64          ChannelStateEvent event = (ChannelStateEvent) e;
65          OioServerSocketChannel channel =
66              (OioServerSocketChannel) event.getChannel();
67          ChannelFuture future = event.getFuture();
68          ChannelState state = event.getState();
69          Object value = event.getValue();
70  
71          switch (state) {
72          case OPEN:
73              if (Boolean.FALSE.equals(value)) {
74                  close(channel, future);
75              }
76              break;
77          case BOUND:
78              if (value != null) {
79                  bind(channel, future, (SocketAddress) value);
80              } else {
81                  close(channel, future);
82              }
83              break;
84          }
85      }
86  
87      private static void handleAcceptedSocket(ChannelEvent e) {
88          if (e instanceof ChannelStateEvent) {
89              ChannelStateEvent event = (ChannelStateEvent) e;
90              OioAcceptedSocketChannel channel =
91                  (OioAcceptedSocketChannel) event.getChannel();
92              ChannelFuture future = event.getFuture();
93              ChannelState state = event.getState();
94              Object value = event.getValue();
95  
96              switch (state) {
97              case OPEN:
98                  if (Boolean.FALSE.equals(value)) {
99                      AbstractOioWorker.close(channel, future);
100                 }
101                 break;
102             case BOUND:
103             case CONNECTED:
104                 if (value == null) {
105                     AbstractOioWorker.close(channel, future);
106                 }
107                 break;
108             case INTEREST_OPS:
109                 AbstractOioWorker.setInterestOps(channel, future, ((Integer) value).intValue());
110                 break;
111             }
112         } else if (e instanceof MessageEvent) {
113             MessageEvent event = (MessageEvent) e;
114             OioSocketChannel channel = (OioSocketChannel) event.getChannel();
115             ChannelFuture future = event.getFuture();
116             Object message = event.getMessage();
117             OioWorker.write(channel, future, message);
118         }
119     }
120 
121     private void bind(
122             OioServerSocketChannel channel, ChannelFuture future,
123             SocketAddress localAddress) {
124 
125         boolean bound = false;
126         boolean bossStarted = false;
127         try {
128             channel.socket.bind(localAddress, channel.getConfig().getBacklog());
129             bound = true;
130 
131             future.setSuccess();
132             localAddress = channel.getLocalAddress();
133             fireChannelBound(channel, localAddress);
134 
135             Executor bossExecutor =
136                 ((OioServerSocketChannelFactory) channel.getFactory()).bossExecutor;
137             DeadLockProofWorker.start(
138                     bossExecutor,
139                     new ThreadRenamingRunnable(
140                             new Boss(channel),
141                             "Old I/O server boss (" + channel + ')'));
142             bossStarted = true;
143         } catch (Throwable t) {
144             future.setFailure(t);
145             fireExceptionCaught(channel, t);
146         } finally {
147             if (!bossStarted && bound) {
148                 close(channel, future);
149             }
150         }
151     }
152 
153     private static void close(OioServerSocketChannel channel, ChannelFuture future) {
154         boolean bound = channel.isBound();
155         try {
156             channel.socket.close();
157 
158             // Make sure the boss thread is not running so that that the future
159             // is notified after a new connection cannot be accepted anymore.
160             // See NETTY-256 for more information.
161             channel.shutdownLock.lock();
162             try {
163                 if (channel.setClosed()) {
164                     future.setSuccess();
165                     if (bound) {
166                         fireChannelUnbound(channel);
167                     }
168                     fireChannelClosed(channel);
169                 } else {
170                     future.setSuccess();
171                 }
172             } finally {
173                 channel.shutdownLock.unlock();
174             }
175         } catch (Throwable t) {
176             future.setFailure(t);
177             fireExceptionCaught(channel, t);
178         }
179     }
180 
181     private final class Boss implements Runnable {
182         private final OioServerSocketChannel channel;
183 
184         Boss(OioServerSocketChannel channel) {
185             this.channel = channel;
186         }
187 
188         public void run() {
189             channel.shutdownLock.lock();
190             try {
191                 while (channel.isBound()) {
192                     try {
193                         Socket acceptedSocket = channel.socket.accept();
194                         try {
195                             ChannelPipeline pipeline =
196                                 channel.getConfig().getPipelineFactory().getPipeline();
197                             final OioAcceptedSocketChannel acceptedChannel =
198                                 new OioAcceptedSocketChannel(
199                                         channel,
200                                         channel.getFactory(),
201                                         pipeline,
202                                         OioServerSocketPipelineSink.this,
203                                         acceptedSocket);
204                             DeadLockProofWorker.start(
205                                     workerExecutor,
206                                     new ThreadRenamingRunnable(
207                                             new OioWorker(acceptedChannel),
208                                             "Old I/O server worker (parentId: " +
209                                             channel.getId() + ", " + channel + ')'));
210                         } catch (Exception e) {
211                             if (logger.isWarnEnabled()) {
212                                 logger.warn(
213                                         "Failed to initialize an accepted socket.", e);
214                             }
215 
216                             try {
217                                 acceptedSocket.close();
218                             } catch (IOException e2) {
219                                 if (logger.isWarnEnabled()) {
220                                     logger.warn(
221                                             "Failed to close a partially accepted socket.",
222                                             e2);
223                                 }
224 
225                             }
226                         }
227                     } catch (SocketTimeoutException e) {
228                         // Thrown every second to stop when requested.
229                     } catch (Throwable e) {
230                         // Do not log the exception if the server socket was closed
231                         // by a user.
232                         if (!channel.socket.isBound() || channel.socket.isClosed()) {
233                             break;
234                         }
235                         if (logger.isWarnEnabled()) {
236                             logger.warn(
237                                     "Failed to accept a connection.", e);
238                         }
239                         try {
240                             Thread.sleep(1000);
241                         } catch (InterruptedException e1) {
242                             // Ignore
243                         }
244                     }
245                 }
246             } finally {
247                 channel.shutdownLock.unlock();
248             }
249         }
250     }
251 }