1 /* 2 * Copyright 2012 The Netty Project 3 * 4 * The Netty Project licenses this file to you under the Apache License, 5 * version 2.0 (the "License"); you may not use this file except in compliance 6 * with the License. You may obtain a copy of the License at: 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 13 * License for the specific language governing permissions and limitations 14 * under the License. 15 */ 16 package org.jboss.netty.channel.socket.nio; 17 18 import java.nio.channels.Selector; 19 import java.util.concurrent.Executor; 20 import java.util.concurrent.Executors; 21 import java.util.concurrent.RejectedExecutionException; 22 23 import org.jboss.netty.channel.Channel; 24 import org.jboss.netty.channel.ChannelPipeline; 25 import org.jboss.netty.channel.group.ChannelGroup; 26 import org.jboss.netty.channel.socket.ClientSocketChannelFactory; 27 import org.jboss.netty.channel.socket.SocketChannel; 28 import org.jboss.netty.util.ExternalResourceReleasable; 29 import org.jboss.netty.util.HashedWheelTimer; 30 import org.jboss.netty.util.Timer; 31 import org.jboss.netty.util.internal.ExecutorUtil; 32 33 /** 34 * A {@link ClientSocketChannelFactory} which creates a client-side NIO-based 35 * {@link SocketChannel}. It utilizes the non-blocking I/O mode which was 36 * introduced with NIO to serve many number of concurrent connections 37 * efficiently. 38 * 39 * <h3>How threads work</h3> 40 * <p> 41 * There are two types of threads in a {@link NioClientSocketChannelFactory}; 42 * one is boss thread and the other is worker thread. 43 * 44 * <h4>Boss thread</h4> 45 * <p> 46 * One {@link NioClientSocketChannelFactory} has one boss thread. It makes 47 * a connection attempt on request. Once a connection attempt succeeds, 48 * the boss thread passes the connected {@link Channel} to one of the worker 49 * threads that the {@link NioClientSocketChannelFactory} manages. 50 * 51 * <h4>Worker threads</h4> 52 * <p> 53 * One {@link NioClientSocketChannelFactory} can have one or more worker 54 * threads. A worker thread performs non-blocking read and write for one or 55 * more {@link Channel}s in a non-blocking mode. 56 * 57 * <h3>Life cycle of threads and graceful shutdown</h3> 58 * <p> 59 * All threads are acquired from the {@link Executor}s which were specified 60 * when a {@link NioClientSocketChannelFactory} was created. A boss thread is 61 * acquired from the {@code bossExecutor}, and worker threads are acquired from 62 * the {@code workerExecutor}. Therefore, you should make sure the specified 63 * {@link Executor}s are able to lend the sufficient number of threads. 64 * It is the best bet to specify {@linkplain Executors#newCachedThreadPool() a cached thread pool}. 65 * <p> 66 * Both boss and worker threads are acquired lazily, and then released when 67 * there's nothing left to process. All the related resources such as 68 * {@link Selector} are also released when the boss and worker threads are 69 * released. Therefore, to shut down a service gracefully, you should do the 70 * following: 71 * 72 * <ol> 73 * <li>close all channels created by the factory usually using 74 * {@link ChannelGroup#close()}, and</li> 75 * <li>call {@link #releaseExternalResources()}.</li> 76 * </ol> 77 * 78 * Please make sure not to shut down the executor until all channels are 79 * closed. Otherwise, you will end up with a {@link RejectedExecutionException} 80 * and the related resources might not be released properly. 81 * 82 * @apiviz.landmark 83 */ 84 public class NioClientSocketChannelFactory implements ClientSocketChannelFactory { 85 86 private static final int DEFAULT_BOSS_COUNT = 1; 87 88 private final Executor bossExecutor; 89 private final WorkerPool<NioWorker> workerPool; 90 private final NioClientSocketPipelineSink sink; 91 private final Timer timer; 92 93 /** 94 * Creates a new {@link NioClientSocketChannelFactory} which uses {@link Executors#newCachedThreadPool()} 95 * for the worker and boss executors. 96 * 97 * See {@link #NioClientSocketChannelFactory(Executor, Executor)} 98 */ 99 public NioClientSocketChannelFactory() { 100 this(Executors.newCachedThreadPool(), Executors.newCachedThreadPool()); 101 } 102 103 /** 104 * Creates a new instance. Calling this constructor is same with calling 105 * {@link #NioClientSocketChannelFactory(Executor, Executor, int, int)} with 106 * 1 and (2 * the number of available processors in the machine) for 107 * <tt>bossCount</tt> and <tt>workerCount</tt> respectively. The number of 108 * available processors is obtained by {@link Runtime#availableProcessors()}. 109 * 110 * @param bossExecutor 111 * the {@link Executor} which will execute the boss thread 112 * @param workerExecutor 113 * the {@link Executor} which will execute the worker threads 114 */ 115 public NioClientSocketChannelFactory( 116 Executor bossExecutor, Executor workerExecutor) { 117 this(bossExecutor, workerExecutor, DEFAULT_BOSS_COUNT, SelectorUtil.DEFAULT_IO_THREADS); 118 } 119 120 /** 121 * Creates a new instance. Calling this constructor is same with calling 122 * {@link #NioClientSocketChannelFactory(Executor, Executor, int, int)} with 123 * 1 as <tt>bossCount</tt>. 124 * 125 * @param bossExecutor 126 * the {@link Executor} which will execute the boss thread 127 * @param workerExecutor 128 * the {@link Executor} which will execute the worker threads 129 * @param workerCount 130 * the maximum number of I/O worker threads 131 */ 132 public NioClientSocketChannelFactory( 133 Executor bossExecutor, Executor workerExecutor, int workerCount) { 134 this(bossExecutor, workerExecutor, DEFAULT_BOSS_COUNT, workerCount); 135 } 136 137 /** 138 * Creates a new instance. 139 * 140 * @param bossExecutor 141 * the {@link Executor} which will execute the boss thread 142 * @param workerExecutor 143 * the {@link Executor} which will execute the worker threads 144 * @param bossCount 145 * the maximum number of boss threads 146 * @param workerCount 147 * the maximum number of I/O worker threads 148 */ 149 public NioClientSocketChannelFactory( 150 Executor bossExecutor, Executor workerExecutor, 151 int bossCount, int workerCount) { 152 this(bossExecutor, bossCount, new NioWorkerPool(workerExecutor, workerCount)); 153 } 154 155 /** 156 * Creates a new instance. 157 * 158 * @param bossExecutor 159 * the {@link Executor} which will execute the boss thread 160 * @param bossCount 161 * the maximum number of boss threads 162 * @param workerPool 163 * the {@link WorkerPool} to use to do the IO 164 */ 165 public NioClientSocketChannelFactory( 166 Executor bossExecutor, int bossCount, 167 WorkerPool<NioWorker> workerPool) { 168 this(bossExecutor, bossCount, workerPool, new HashedWheelTimer()); 169 } 170 171 /** 172 * Creates a new instance. 173 * 174 * @param bossExecutor 175 * the {@link Executor} which will execute the boss thread 176 * @param bossCount 177 * the maximum number of boss threads 178 * @param workerPool 179 * the {@link WorkerPool} to use to do the IO 180 * @param timer 181 * the {@link Timer} to use to handle the connection timeouts 182 */ 183 public NioClientSocketChannelFactory( 184 Executor bossExecutor, int bossCount, 185 WorkerPool<NioWorker> workerPool, Timer timer) { 186 187 if (bossExecutor == null) { 188 throw new NullPointerException("bossExecutor"); 189 } 190 if (workerPool == null) { 191 throw new NullPointerException("workerPool"); 192 } 193 if (bossCount <= 0) { 194 throw new IllegalArgumentException( 195 "bossCount (" + bossCount + ") " + 196 "must be a positive integer."); 197 } 198 199 200 this.bossExecutor = bossExecutor; 201 this.workerPool = workerPool; 202 this.timer = timer; 203 sink = new NioClientSocketPipelineSink( 204 bossExecutor, bossCount, workerPool, timer); 205 } 206 207 public SocketChannel newChannel(ChannelPipeline pipeline) { 208 return new NioClientSocketChannel(this, pipeline, sink, sink.nextWorker()); 209 } 210 211 public void releaseExternalResources() { 212 ExecutorUtil.terminate(bossExecutor); 213 timer.stop(); 214 if (workerPool instanceof ExternalResourceReleasable) { 215 ((ExternalResourceReleasable) workerPool).releaseExternalResources(); 216 } 217 } 218 }