View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.channel.socket.nio;
17  
18  import java.nio.channels.Selector;
19  import java.util.concurrent.Executor;
20  import java.util.concurrent.Executors;
21  import java.util.concurrent.RejectedExecutionException;
22  import java.util.concurrent.ThreadPoolExecutor;
23  
24  import org.jboss.netty.channel.Channel;
25  import org.jboss.netty.channel.ChannelPipeline;
26  import org.jboss.netty.channel.group.ChannelGroup;
27  import org.jboss.netty.channel.socket.ServerSocketChannel;
28  import org.jboss.netty.channel.socket.ServerSocketChannelFactory;
29  import org.jboss.netty.util.ExternalResourceReleasable;
30  
31  /**
32   * A {@link ServerSocketChannelFactory} which creates a server-side NIO-based
33   * {@link ServerSocketChannel}.  It utilizes the non-blocking I/O mode which
34   * was introduced with NIO to serve many number of concurrent connections
35   * efficiently.
36   *
37   * <h3>How threads work</h3>
38   * <p>
39   * There are two types of threads in a {@link NioServerSocketChannelFactory};
40   * one is boss thread and the other is worker thread.
41   *
42   * <h4>Boss threads</h4>
43   * <p>
44   * Each bound {@link ServerSocketChannel} has its own boss thread.
45   * For example, if you opened two server ports such as 80 and 443, you will
46   * have two boss threads.  A boss thread accepts incoming connections until
47   * the port is unbound.  Once a connection is accepted successfully, the boss
48   * thread passes the accepted {@link Channel} to one of the worker
49   * threads that the {@link NioServerSocketChannelFactory} manages.
50   *
51   * <h4>Worker threads</h4>
52   * <p>
53   * One {@link NioServerSocketChannelFactory} can have one or more worker
54   * threads.  A worker thread performs non-blocking read and write for one or
55   * more {@link Channel}s in a non-blocking mode.
56   *
57   * <h3>Life cycle of threads and graceful shutdown</h3>
58   * <p>
59   * All threads are acquired from the {@link Executor}s which were specified
60   * when a {@link NioServerSocketChannelFactory} was created.  Boss threads are
61   * acquired from the {@code bossExecutor}, and worker threads are acquired from
62   * the {@code workerExecutor}.  Therefore, you should make sure the specified
63   * {@link Executor}s are able to lend the sufficient number of threads.
64   * It is the best bet to specify {@linkplain Executors#newCachedThreadPool() a cached thread pool}.
65   * <p>
66   * Both boss and worker threads are acquired lazily, and then released when
67   * there's nothing left to process.  All the related resources such as
68   * {@link Selector} are also released when the boss and worker threads are
69   * released.  Therefore, to shut down a service gracefully, you should do the
70   * following:
71   *
72   * <ol>
73   * <li>unbind all channels created by the factory,
74   * <li>close all child channels accepted by the unbound channels, and
75   *     (these two steps so far is usually done using {@link ChannelGroup#close()})</li>
76   * <li>call {@link #releaseExternalResources()}.</li>
77   * </ol>
78   *
79   * Please make sure not to shut down the executor until all channels are
80   * closed.  Otherwise, you will end up with a {@link RejectedExecutionException}
81   * and the related resources might not be released properly.
82   *
83   * @apiviz.landmark
84   */
85  public class NioServerSocketChannelFactory implements ServerSocketChannelFactory {
86  
87      private final WorkerPool<NioWorker> workerPool;
88      private final NioServerSocketPipelineSink sink;
89      private final BossPool<NioServerBoss> bossPool;
90      private boolean releasePools;
91  
92      /**
93       * Create a new {@link NioServerSocketChannelFactory} using {@link Executors#newCachedThreadPool()}
94       * for the boss and worker.
95       *
96       * See {@link #NioServerSocketChannelFactory(Executor, Executor)}
97       */
98      public NioServerSocketChannelFactory() {
99          this(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
100         releasePools = true;
101     }
102 
103     /**
104      * Creates a new instance.  Calling this constructor is same with calling
105      * {@link #NioServerSocketChannelFactory(Executor, Executor, int)} with
106      * the worker executor passed into {@link #getMaxThreads(Executor)}.
107      *
108      * @param bossExecutor
109      *        the {@link Executor} which will execute the boss threads
110      * @param workerExecutor
111      *        the {@link Executor} which will execute the I/O worker threads
112      */
113     public NioServerSocketChannelFactory(
114             Executor bossExecutor, Executor workerExecutor) {
115         this(bossExecutor, workerExecutor, getMaxThreads(workerExecutor));
116     }
117 
118     /**
119      * Creates a new instance.
120      *
121      * @param bossExecutor
122      *        the {@link Executor} which will execute the boss threads
123      * @param workerExecutor
124      *        the {@link Executor} which will execute the I/O worker threads
125      * @param workerCount
126      *        the maximum number of I/O worker threads
127      */
128     public NioServerSocketChannelFactory(
129             Executor bossExecutor, Executor workerExecutor,
130             int workerCount) {
131         this(bossExecutor, 1, workerExecutor, workerCount);
132     }
133 
134     /**
135      * Create a new instance.
136      *
137      * @param bossExecutor
138      *        the {@link Executor} which will execute the boss threads
139      * @param bossCount
140      *        the number of boss threads
141      * @param workerExecutor
142      *        the {@link Executor} which will execute the I/O worker threads
143      * @param workerCount
144      *        the maximum number of I/O worker threads
145      */
146     public NioServerSocketChannelFactory(
147             Executor bossExecutor, int bossCount, Executor workerExecutor,
148             int workerCount) {
149         this(bossExecutor, bossCount, new NioWorkerPool(workerExecutor, workerCount));
150     }
151 
152     /**
153      * Creates a new instance.
154      *
155      * @param bossExecutor
156      *        the {@link Executor} which will execute the boss threads
157      * @param workerPool
158      *        the {@link WorkerPool} which will be used to obtain the {@link NioWorker} that execute
159      *        the I/O worker threads
160      */
161     public NioServerSocketChannelFactory(
162             Executor bossExecutor, WorkerPool<NioWorker> workerPool) {
163         this(bossExecutor, 1 , workerPool);
164     }
165 
166     /**
167      * Create a new instance.
168      *
169      * @param bossExecutor
170      *        the {@link Executor} which will execute the boss threads
171      * @param bossCount
172      *        the number of boss threads
173      * @param workerPool
174      *        the {@link WorkerPool} which will be used to obtain the {@link NioWorker} that execute
175      *        the I/O worker threads
176      */
177     public NioServerSocketChannelFactory(
178             Executor bossExecutor, int bossCount, WorkerPool<NioWorker> workerPool) {
179         this(new NioServerBossPool(bossExecutor, bossCount, null), workerPool);
180     }
181 
182     /**
183      * Create a new instance.
184      *
185      * @param bossPool
186      *        the {@link BossPool} which will be used to obtain the {@link NioServerBoss} that execute
187      *        the I/O for accept new connections
188      * @param workerPool
189      *        the {@link WorkerPool} which will be used to obtain the {@link NioWorker} that execute
190      *        the I/O worker threads
191      */
192     public NioServerSocketChannelFactory(BossPool<NioServerBoss> bossPool, WorkerPool<NioWorker> workerPool) {
193         if (bossPool == null) {
194             throw new NullPointerException("bossExecutor");
195         }
196         if (workerPool == null) {
197             throw new NullPointerException("workerPool");
198         }
199         this.bossPool = bossPool;
200         this.workerPool = workerPool;
201         sink = new NioServerSocketPipelineSink();
202     }
203 
204     public ServerSocketChannel newChannel(ChannelPipeline pipeline) {
205         return new NioServerSocketChannel(this, pipeline, sink, bossPool.nextBoss(), workerPool);
206     }
207 
208     public void shutdown() {
209         bossPool.shutdown();
210         workerPool.shutdown();
211         if (releasePools) {
212             releasePools();
213         }
214     }
215 
216     public void releaseExternalResources() {
217         bossPool.shutdown();
218         workerPool.shutdown();
219         releasePools();
220     }
221 
222     private void releasePools() {
223         if (bossPool instanceof ExternalResourceReleasable) {
224             ((ExternalResourceReleasable) bossPool).releaseExternalResources();
225         }
226         if (workerPool instanceof ExternalResourceReleasable) {
227             ((ExternalResourceReleasable) workerPool).releaseExternalResources();
228         }
229     }
230 
231     /**
232      * Returns number of max threads for the {@link NioWorkerPool} to use. If
233      * the * {@link Executor} is a {@link ThreadPoolExecutor}, check its
234      * maximum * pool size and return either it's maximum or
235      * {@link SelectorUtil#DEFAULT_IO_THREADS}, whichever is lower. Note that
236      * {@link SelectorUtil#DEFAULT_IO_THREADS} is 2 * the number of available
237      * processors in the machine.  The number of available processors is
238      * obtained by {@link Runtime#availableProcessors()}.
239      *
240      * @param executor
241      *        the {@link Executor} which will execute the I/O worker threads
242      * @return
243      *        number of maximum threads the NioWorkerPool should use
244      */
245     private static int getMaxThreads(Executor executor) {
246         if (executor instanceof ThreadPoolExecutor) {
247             final int maxThreads = ((ThreadPoolExecutor) executor).getMaximumPoolSize();
248             return Math.min(maxThreads, SelectorUtil.DEFAULT_IO_THREADS);
249         }
250         return SelectorUtil.DEFAULT_IO_THREADS;
251     }
252 }