View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.channel.socket.nio;
17  
18  import java.nio.channels.Selector;
19  import java.util.concurrent.Executor;
20  import java.util.concurrent.Executors;
21  import java.util.concurrent.RejectedExecutionException;
22  
23  import org.jboss.netty.channel.Channel;
24  import org.jboss.netty.channel.ChannelPipeline;
25  import org.jboss.netty.channel.group.ChannelGroup;
26  import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
27  import org.jboss.netty.channel.socket.SocketChannel;
28  import org.jboss.netty.util.ExternalResourceReleasable;
29  import org.jboss.netty.util.HashedWheelTimer;
30  import org.jboss.netty.util.Timer;
31  import org.jboss.netty.util.internal.ExecutorUtil;
32  
33  /**
34   * A {@link ClientSocketChannelFactory} which creates a client-side NIO-based
35   * {@link SocketChannel}.  It utilizes the non-blocking I/O mode which was
36   * introduced with NIO to serve many number of concurrent connections
37   * efficiently.
38   *
39   * <h3>How threads work</h3>
40   * <p>
41   * There are two types of threads in a {@link NioClientSocketChannelFactory};
42   * one is boss thread and the other is worker thread.
43   *
44   * <h4>Boss thread</h4>
45   * <p>
46   * One {@link NioClientSocketChannelFactory} has one boss thread.  It makes
47   * a connection attempt on request.  Once a connection attempt succeeds,
48   * the boss thread passes the connected {@link Channel} to one of the worker
49   * threads that the {@link NioClientSocketChannelFactory} manages.
50   *
51   * <h4>Worker threads</h4>
52   * <p>
53   * One {@link NioClientSocketChannelFactory} can have one or more worker
54   * threads.  A worker thread performs non-blocking read and write for one or
55   * more {@link Channel}s in a non-blocking mode.
56   *
57   * <h3>Life cycle of threads and graceful shutdown</h3>
58   * <p>
59   * All threads are acquired from the {@link Executor}s which were specified
60   * when a {@link NioClientSocketChannelFactory} was created.  A boss thread is
61   * acquired from the {@code bossExecutor}, and worker threads are acquired from
62   * the {@code workerExecutor}.  Therefore, you should make sure the specified
63   * {@link Executor}s are able to lend the sufficient number of threads.
64   * It is the best bet to specify {@linkplain Executors#newCachedThreadPool() a cached thread pool}.
65   * <p>
66   * Both boss and worker threads are acquired lazily, and then released when
67   * there's nothing left to process.  All the related resources such as
68   * {@link Selector} are also released when the boss and worker threads are
69   * released.  Therefore, to shut down a service gracefully, you should do the
70   * following:
71   *
72   * <ol>
73   * <li>close all channels created by the factory usually using
74   *     {@link ChannelGroup#close()}, and</li>
75   * <li>call {@link #releaseExternalResources()}.</li>
76   * </ol>
77   *
78   * Please make sure not to shut down the executor until all channels are
79   * closed.  Otherwise, you will end up with a {@link RejectedExecutionException}
80   * and the related resources might not be released properly.
81   *
82   * @apiviz.landmark
83   */
84  public class NioClientSocketChannelFactory implements ClientSocketChannelFactory {
85  
86      private static final int DEFAULT_BOSS_COUNT = 1;
87  
88      private final Executor bossExecutor;
89      private final WorkerPool<NioWorker> workerPool;
90      private final NioClientSocketPipelineSink sink;
91      private final Timer timer;
92  
93      /**
94       * Creates a new {@link NioClientSocketChannelFactory} which uses {@link Executors#newCachedThreadPool()}
95       * for the worker and boss executors.
96       *
97       * See {@link #NioClientSocketChannelFactory(Executor, Executor)}
98       */
99      public NioClientSocketChannelFactory() {
100         this(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
101     }
102 
103     /**
104      * Creates a new instance.  Calling this constructor is same with calling
105      * {@link #NioClientSocketChannelFactory(Executor, Executor, int, int)} with
106      * 1 and (2 * the number of available processors in the machine) for
107      * <tt>bossCount</tt> and <tt>workerCount</tt> respectively.  The number of
108      * available processors is obtained by {@link Runtime#availableProcessors()}.
109      *
110      * @param bossExecutor
111      *        the {@link Executor} which will execute the boss thread
112      * @param workerExecutor
113      *        the {@link Executor} which will execute the worker threads
114      */
115     public NioClientSocketChannelFactory(
116             Executor bossExecutor, Executor workerExecutor) {
117         this(bossExecutor, workerExecutor, DEFAULT_BOSS_COUNT, SelectorUtil.DEFAULT_IO_THREADS);
118     }
119 
120     /**
121      * Creates a new instance.  Calling this constructor is same with calling
122      * {@link #NioClientSocketChannelFactory(Executor, Executor, int, int)} with
123      * 1 as <tt>bossCount</tt>.
124      *
125      * @param bossExecutor
126      *        the {@link Executor} which will execute the boss thread
127      * @param workerExecutor
128      *        the {@link Executor} which will execute the worker threads
129      * @param workerCount
130      *        the maximum number of I/O worker threads
131      */
132     public NioClientSocketChannelFactory(
133             Executor bossExecutor, Executor workerExecutor, int workerCount) {
134         this(bossExecutor, workerExecutor, DEFAULT_BOSS_COUNT, workerCount);
135     }
136 
137     /**
138      * Creates a new instance.
139      *
140      * @param bossExecutor
141      *        the {@link Executor} which will execute the boss thread
142      * @param workerExecutor
143      *        the {@link Executor} which will execute the worker threads
144      * @param bossCount
145      *        the maximum number of boss threads
146      * @param workerCount
147      *        the maximum number of I/O worker threads
148      */
149     public NioClientSocketChannelFactory(
150             Executor bossExecutor, Executor workerExecutor,
151             int bossCount, int workerCount) {
152         this(bossExecutor, bossCount, new NioWorkerPool(workerExecutor, workerCount));
153     }
154 
155     /**
156      * Creates a new instance.
157      *
158      * @param bossExecutor
159      *        the {@link Executor} which will execute the boss thread
160      * @param bossCount
161      *        the maximum number of boss threads
162      * @param workerPool
163      *        the {@link WorkerPool} to use to do the IO
164      */
165     public NioClientSocketChannelFactory(
166             Executor bossExecutor, int bossCount,
167             WorkerPool<NioWorker> workerPool) {
168         this(bossExecutor, bossCount, workerPool, new HashedWheelTimer());
169     }
170 
171     /**
172      * Creates a new instance.
173      *
174      * @param bossExecutor
175      *        the {@link Executor} which will execute the boss thread
176      * @param bossCount
177      *        the maximum number of boss threads
178      * @param workerPool
179      *        the {@link WorkerPool} to use to do the IO
180      * @param timer
181      *        the {@link Timer} to use to handle the connection timeouts
182      */
183     public NioClientSocketChannelFactory(
184             Executor bossExecutor, int bossCount,
185             WorkerPool<NioWorker> workerPool, Timer timer) {
186 
187         if (bossExecutor == null) {
188             throw new NullPointerException("bossExecutor");
189         }
190         if (workerPool == null) {
191             throw new NullPointerException("workerPool");
192         }
193         if (bossCount <= 0) {
194             throw new IllegalArgumentException(
195                     "bossCount (" + bossCount + ") " +
196                     "must be a positive integer.");
197         }
198 
199 
200         this.bossExecutor = bossExecutor;
201         this.workerPool = workerPool;
202         this.timer = timer;
203         sink = new NioClientSocketPipelineSink(
204                 bossExecutor, bossCount, workerPool, timer);
205     }
206 
207     public SocketChannel newChannel(ChannelPipeline pipeline) {
208         return new NioClientSocketChannel(this, pipeline, sink, sink.nextWorker());
209     }
210 
211     public void releaseExternalResources() {
212         ExecutorUtil.terminate(bossExecutor);
213         timer.stop();
214         if (workerPool instanceof ExternalResourceReleasable) {
215             ((ExternalResourceReleasable) workerPool).releaseExternalResources();
216         }
217     }
218 }