1 /*
2 * Copyright 2012 The Netty Project
3 *
4 * The Netty Project licenses this file to you under the Apache License,
5 * version 2.0 (the "License"); you may not use this file except in compliance
6 * with the License. You may obtain a copy of the License at:
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations
14 * under the License.
15 */
16 package org.jboss.netty.channel.socket.nio;
17
18 import java.nio.channels.Selector;
19 import java.util.concurrent.Executor;
20 import java.util.concurrent.Executors;
21 import java.util.concurrent.RejectedExecutionException;
22
23 import org.jboss.netty.channel.Channel;
24 import org.jboss.netty.channel.ChannelPipeline;
25 import org.jboss.netty.channel.group.ChannelGroup;
26 import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
27 import org.jboss.netty.channel.socket.SocketChannel;
28 import org.jboss.netty.util.ExternalResourceReleasable;
29 import org.jboss.netty.util.Timer;
30
31 /**
32 * A {@link ClientSocketChannelFactory} which creates a client-side NIO-based
33 * {@link SocketChannel}. It utilizes the non-blocking I/O mode which was
34 * introduced with NIO to serve many number of concurrent connections
35 * efficiently.
36 *
37 * <h3>How threads work</h3>
38 * <p>
39 * There are two types of threads in a {@link NioClientSocketChannelFactory};
40 * one is boss thread and the other is worker thread.
41 *
42 * <h4>Boss thread</h4>
43 * <p>
44 * One {@link NioClientSocketChannelFactory} has one boss thread. It makes
45 * a connection attempt on request. Once a connection attempt succeeds,
46 * the boss thread passes the connected {@link Channel} to one of the worker
47 * threads that the {@link NioClientSocketChannelFactory} manages.
48 *
49 * <h4>Worker threads</h4>
50 * <p>
51 * One {@link NioClientSocketChannelFactory} can have one or more worker
52 * threads. A worker thread performs non-blocking read and write for one or
53 * more {@link Channel}s in a non-blocking mode.
54 *
55 * <h3>Life cycle of threads and graceful shutdown</h3>
56 * <p>
57 * All threads are acquired from the {@link Executor}s which were specified
58 * when a {@link NioClientSocketChannelFactory} was created. A boss thread is
59 * acquired from the {@code bossExecutor}, and worker threads are acquired from
60 * the {@code workerExecutor}. Therefore, you should make sure the specified
61 * {@link Executor}s are able to lend the sufficient number of threads.
62 * It is the best bet to specify {@linkplain Executors#newCachedThreadPool() a cached thread pool}.
63 * <p>
64 * Both boss and worker threads are acquired lazily, and then released when
65 * there's nothing left to process. All the related resources such as
66 * {@link Selector} are also released when the boss and worker threads are
67 * released. Therefore, to shut down a service gracefully, you should do the
68 * following:
69 *
70 * <ol>
71 * <li>close all channels created by the factory usually using
72 * {@link ChannelGroup#close()}, and</li>
73 * <li>call {@link #releaseExternalResources()}.</li>
74 * </ol>
75 *
76 * Please make sure not to shut down the executor until all channels are
77 * closed. Otherwise, you will end up with a {@link RejectedExecutionException}
78 * and the related resources might not be released properly.
79 *
80 * @apiviz.landmark
81 */
82 public class NioClientSocketChannelFactory implements ClientSocketChannelFactory {
83
84 private static final int DEFAULT_BOSS_COUNT = 1;
85
86 private final BossPool<NioClientBoss> bossPool;
87 private final WorkerPool<NioWorker> workerPool;
88 private final NioClientSocketPipelineSink sink;
89 private boolean releasePools;
90
91 /**
92 * Creates a new {@link NioClientSocketChannelFactory} which uses {@link Executors#newCachedThreadPool()}
93 * for the worker and boss executors.
94 *
95 * See {@link #NioClientSocketChannelFactory(Executor, Executor)}
96 */
97 public NioClientSocketChannelFactory() {
98 this(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
99 releasePools = true;
100 }
101
102 /**
103 * Creates a new instance. Calling this constructor is same with calling
104 * {@link #NioClientSocketChannelFactory(Executor, Executor, int, int)} with
105 * 1 and (2 * the number of available processors in the machine) for
106 * <tt>bossCount</tt> and <tt>workerCount</tt> respectively. The number of
107 * available processors is obtained by {@link Runtime#availableProcessors()}.
108 *
109 * @param bossExecutor
110 * the {@link Executor} which will execute the boss thread
111 * @param workerExecutor
112 * the {@link Executor} which will execute the worker threads
113 */
114 public NioClientSocketChannelFactory(
115 Executor bossExecutor, Executor workerExecutor) {
116 this(bossExecutor, workerExecutor, DEFAULT_BOSS_COUNT, SelectorUtil.DEFAULT_IO_THREADS);
117 }
118
119 /**
120 * Creates a new instance. Calling this constructor is same with calling
121 * {@link #NioClientSocketChannelFactory(Executor, Executor, int, int)} with
122 * 1 as <tt>bossCount</tt>.
123 *
124 * @param bossExecutor
125 * the {@link Executor} which will execute the boss thread
126 * @param workerExecutor
127 * the {@link Executor} which will execute the worker threads
128 * @param workerCount
129 * the maximum number of I/O worker threads
130 */
131 public NioClientSocketChannelFactory(
132 Executor bossExecutor, Executor workerExecutor, int workerCount) {
133 this(bossExecutor, workerExecutor, DEFAULT_BOSS_COUNT, workerCount);
134 }
135
136 /**
137 * Creates a new instance.
138 *
139 * @param bossExecutor
140 * the {@link Executor} which will execute the boss thread
141 * @param workerExecutor
142 * the {@link Executor} which will execute the worker threads
143 * @param bossCount
144 * the maximum number of boss threads
145 * @param workerCount
146 * the maximum number of I/O worker threads
147 */
148 public NioClientSocketChannelFactory(
149 Executor bossExecutor, Executor workerExecutor,
150 int bossCount, int workerCount) {
151 this(bossExecutor, bossCount, new NioWorkerPool(workerExecutor, workerCount));
152 }
153
154 /**
155 * Creates a new instance.
156 *
157 * @param bossExecutor
158 * the {@link Executor} which will execute the boss thread
159 * @param bossCount
160 * the maximum number of boss threads
161 * @param workerPool
162 * the {@link WorkerPool} to use to do the IO
163 */
164 public NioClientSocketChannelFactory(
165 Executor bossExecutor, int bossCount,
166 WorkerPool<NioWorker> workerPool) {
167 this(new NioClientBossPool(bossExecutor, bossCount), workerPool);
168 }
169
170 /**
171 * Creates a new instance.
172 *
173 * @param bossExecutor
174 * the {@link Executor} which will execute the boss thread
175 * @param bossCount
176 * the maximum number of boss threads
177 * @param workerPool
178 * the {@link WorkerPool} to use to do the IO
179 * @param timer
180 * the {@link Timer} to use to handle the connection timeouts
181 */
182 public NioClientSocketChannelFactory(
183 Executor bossExecutor, int bossCount,
184 WorkerPool<NioWorker> workerPool, Timer timer) {
185 this(new NioClientBossPool(bossExecutor, bossCount, timer, null), workerPool);
186 }
187
188 /**
189 * Creates a new instance.
190 *
191 * @param bossPool
192 * the {@link BossPool} to use to handle the connects
193 * @param workerPool
194 * the {@link WorkerPool} to use to do the IO
195 */
196 public NioClientSocketChannelFactory(
197 BossPool<NioClientBoss> bossPool,
198 WorkerPool<NioWorker> workerPool) {
199
200 if (bossPool == null) {
201 throw new NullPointerException("bossPool");
202 }
203 if (workerPool == null) {
204 throw new NullPointerException("workerPool");
205 }
206 this.bossPool = bossPool;
207 this.workerPool = workerPool;
208 sink = new NioClientSocketPipelineSink(bossPool);
209 }
210
211 public SocketChannel newChannel(ChannelPipeline pipeline) {
212 return new NioClientSocketChannel(this, pipeline, sink, workerPool.nextWorker());
213 }
214
215 public void shutdown() {
216 bossPool.shutdown();
217 workerPool.shutdown();
218 if (releasePools) {
219 releasePools();
220 }
221 }
222
223 public void releaseExternalResources() {
224 shutdown();
225 releasePools();
226 }
227
228 private void releasePools() {
229 if (bossPool instanceof ExternalResourceReleasable) {
230 ((ExternalResourceReleasable) bossPool).releaseExternalResources();
231 }
232 if (workerPool instanceof ExternalResourceReleasable) {
233 ((ExternalResourceReleasable) workerPool).releaseExternalResources();
234 }
235 }
236 }