1 /* 2 * Copyright 2012 The Netty Project 3 * 4 * The Netty Project licenses this file to you under the Apache License, 5 * version 2.0 (the "License"); you may not use this file except in compliance 6 * with the License. You may obtain a copy of the License at: 7 * 8 * https://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 13 * License for the specific language governing permissions and limitations 14 * under the License. 15 */ 16 package io.netty5.channel; 17 18 import io.netty5.util.NettyRuntime; 19 import io.netty5.util.concurrent.DefaultThreadFactory; 20 import io.netty5.util.concurrent.MultithreadEventExecutorGroup; 21 import io.netty5.util.concurrent.RejectedExecutionHandler; 22 import io.netty5.util.concurrent.RejectedExecutionHandlers; 23 import io.netty5.util.concurrent.ThreadPerTaskExecutor; 24 import io.netty5.util.internal.EmptyArrays; 25 import io.netty5.util.internal.SystemPropertyUtil; 26 import io.netty5.util.internal.logging.InternalLogger; 27 import io.netty5.util.internal.logging.InternalLoggerFactory; 28 29 import java.util.ArrayList; 30 import java.util.Arrays; 31 import java.util.Collections; 32 import java.util.List; 33 import java.util.concurrent.Executor; 34 import java.util.concurrent.ThreadFactory; 35 36 /** 37 * {@link EventLoopGroup} implementation that will handle its tasks with multiple threads. 38 */ 39 public class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup { 40 41 private static final InternalLogger logger = InternalLoggerFactory.getInstance(MultithreadEventLoopGroup.class); 42 43 public static final int DEFAULT_EVENT_LOOP_THREADS; 44 45 static { 46 DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt( 47 "io.netty5.eventLoopThreads", NettyRuntime.availableProcessors() * 2)); 48 49 if (logger.isDebugEnabled()) { 50 logger.debug("-Dio.netty5.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS); 51 } 52 } 53 54 /** 55 * Create a new instance. 56 * 57 * @param ioHandlerFactory the {@link IoHandlerFactory} to use for creating new 58 * {@link IoHandler} instances that will handle the IO for the 59 * {@link EventLoop}. 60 */ 61 public MultithreadEventLoopGroup(IoHandlerFactory ioHandlerFactory) { 62 this(0, (Executor) null, ioHandlerFactory); 63 } 64 65 /** 66 * Create a new instance. 67 * 68 * @param nThreads the number of threads that will be used by this instance. 69 * @param ioHandlerFactory the {@link IoHandlerFactory} to use for creating new 70 * {@link IoHandler} instances that will handle the IO for the 71 * {@link EventLoop}. 72 */ 73 public MultithreadEventLoopGroup(int nThreads, IoHandlerFactory ioHandlerFactory) { 74 this(nThreads, (Executor) null, ioHandlerFactory); 75 } 76 77 /** 78 * Create a new instance. 79 * 80 * @param nThreads the number of threads that will be used by this instance. 81 * @param executor the {@link Executor} to use, or {@code null} if the default should be used. 82 * @param ioHandlerFactory the {@link IoHandlerFactory} to use for creating new 83 * {@link IoHandler} instances that will handle the IO for the 84 * {@link EventLoop}. 85 */ 86 public MultithreadEventLoopGroup(int nThreads, Executor executor, 87 IoHandlerFactory ioHandlerFactory) { 88 this(nThreads, executor, ioHandlerFactory, 89 SingleThreadEventLoop.DEFAULT_MAX_PENDING_TASKS, RejectedExecutionHandlers.reject(), 90 SingleThreadEventLoop.DEFAULT_MAX_TASKS_PER_RUN); 91 } 92 93 /** 94 * Create a new instance. 95 * 96 * @param nThreads the number of threads that will be used by this instance. 97 * @param threadFactory the {@link ThreadFactory} to use, or {@code null} if the default should be used. 98 * @param ioHandlerFactory the {@link IoHandlerFactory} to use for creating new 99 * {@link IoHandler} instances that will handle the IO for the 100 * {@link EventLoop}. 101 */ 102 public MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, 103 IoHandlerFactory ioHandlerFactory) { 104 this(nThreads, threadFactory, ioHandlerFactory, 105 SingleThreadEventLoop.DEFAULT_MAX_PENDING_TASKS, RejectedExecutionHandlers.reject()); 106 } 107 108 /** 109 * Create a new instance. 110 * 111 * @param executor the {@link Executor} to use, or {@code null} if the default should be used. 112 * @param ioHandlerFactory the {@link IoHandlerFactory} to use for creating new 113 * {@link IoHandler} instances that will handle the IO for the 114 * {@link EventLoop}. 115 */ 116 public MultithreadEventLoopGroup(Executor executor, 117 IoHandlerFactory ioHandlerFactory) { 118 this(0, executor, ioHandlerFactory, 119 SingleThreadEventLoop.DEFAULT_MAX_PENDING_TASKS, RejectedExecutionHandlers.reject(), 120 SingleThreadEventLoop.DEFAULT_MAX_TASKS_PER_RUN); 121 } 122 123 /** 124 * Create a new instance. 125 * 126 * @param threadFactory the {@link ThreadFactory} to use, or {@code null} if the default should be used. 127 * @param ioHandlerFactory the {@link IoHandlerFactory} to use for creating new 128 * {@link IoHandler} instances that will handle the IO for the 129 * {@link EventLoop}. 130 */ 131 public MultithreadEventLoopGroup(ThreadFactory threadFactory, 132 IoHandlerFactory ioHandlerFactory) { 133 this(0, threadFactory, ioHandlerFactory, 134 SingleThreadEventLoop.DEFAULT_MAX_PENDING_TASKS, RejectedExecutionHandlers.reject()); 135 } 136 137 /** 138 * Create a new instance. 139 * 140 * @param nThreads the number of threads that will be used by this instance. 141 * @param executor the {@link Executor} to use, or {@code null} if the default should be used. 142 * @param ioHandlerFactory the {@link IoHandlerFactory} to use for creating new 143 * {@link IoHandler} instances that will handle the IO for the 144 * {@link EventLoop}. 145 * @param maxPendingTasks the maximum number of pending tasks before new tasks will be rejected. 146 * @param rejectedHandler the {@link RejectedExecutionHandler} to use. 147 */ 148 public MultithreadEventLoopGroup(int nThreads, Executor executor, 149 IoHandlerFactory ioHandlerFactory, 150 int maxPendingTasks, RejectedExecutionHandler rejectedHandler) { 151 this(nThreads, executor, ioHandlerFactory, maxPendingTasks, rejectedHandler, 152 SingleThreadEventLoop.DEFAULT_MAX_TASKS_PER_RUN); 153 } 154 155 /** 156 * Create a new instance. 157 * 158 * @param nThreads the number of threads that will be used by this instance. 159 * @param threadFactory the {@link ThreadFactory} to use, or {@code null} if the default should be used. 160 * @param ioHandlerFactory the {@link IoHandlerFactory} to use for creating new 161 * {@link IoHandler} instances that will handle the IO for the 162 * {@link EventLoop}. 163 * @param maxPendingTasks the maximum number of pending tasks before new tasks will be rejected. 164 * @param rejectedHandler the {@link RejectedExecutionHandler} to use. 165 */ 166 public MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, 167 IoHandlerFactory ioHandlerFactory, 168 int maxPendingTasks, RejectedExecutionHandler rejectedHandler) { 169 this(nThreads, threadFactory, ioHandlerFactory, maxPendingTasks, rejectedHandler, 170 SingleThreadEventLoop.DEFAULT_MAX_TASKS_PER_RUN); 171 } 172 173 /** 174 * Create a new instance. 175 * 176 * @param nThreads the number of threads that will be used by this instance. 177 * @param executor the {@link Executor} to use, or {@code null} if the default should be used. 178 * @param ioHandlerFactory the {@link IoHandlerFactory} to use for creating new 179 * {@link IoHandler} instances that will handle the IO for the 180 * {@link EventLoop}. 181 * @param maxPendingTasks the maximum number of pending tasks before new tasks will be rejected. 182 * @param rejectedHandler the {@link RejectedExecutionHandler} to use. 183 * @param maxTasksPerRun the maximum number of tasks per {@link EventLoop} run that will be processed 184 * before trying to handle IO again. 185 */ 186 public MultithreadEventLoopGroup(int nThreads, Executor executor, 187 IoHandlerFactory ioHandlerFactory, 188 int maxPendingTasks, RejectedExecutionHandler rejectedHandler, 189 int maxTasksPerRun) { 190 this(nThreads, executor, ioHandlerFactory, 191 maxPendingTasks, rejectedHandler, maxTasksPerRun, EmptyArrays.EMPTY_OBJECTS); 192 } 193 194 /** 195 * Create a new instance. 196 * 197 * @param nThreads the number of threads that will be used by this instance. 198 * @param threadFactory the {@link ThreadFactory} to use, or {@code null} if the default should be used. 199 * @param ioHandlerFactory the {@link IoHandlerFactory} to use for creating new 200 * {@link IoHandler} instances that will handle the IO for the 201 * {@link EventLoop}. 202 * @param maxPendingTasks the maximum number of pending tasks before new tasks will be rejected. 203 * @param rejectedHandler the {@link RejectedExecutionHandler} to use. 204 * @param maxTasksPerRun the maximum number of tasks per {@link EventLoop} run that will be processed 205 * before trying to handle IO again. 206 */ 207 public MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, 208 IoHandlerFactory ioHandlerFactory, 209 int maxPendingTasks, RejectedExecutionHandler rejectedHandler, 210 int maxTasksPerRun) { 211 this(nThreads, threadFactory, ioHandlerFactory, 212 maxPendingTasks, rejectedHandler, maxTasksPerRun, EmptyArrays.EMPTY_OBJECTS); 213 } 214 215 // Constructors provided for sub-classes that want to pass more args to newChild(...). 216 217 /** 218 * Create a new instance. 219 * 220 * @param nThreads the number of threads that will be used by this instance. 221 * @param executor the {@link Executor} to use, or {@code null} if the default should be used. 222 * @param ioHandlerFactory the {@link IoHandlerFactory} to use for creating new 223 * {@link IoHandler} instances that will handle the IO for the 224 * {@link EventLoop}. 225 * @param maxPendingTasks the maximum number of pending tasks before new tasks will be rejected. 226 * @param rejectedHandler the {@link RejectedExecutionHandler} to use. 227 * @param maxTasksPerRun the maximum number of tasks per {@link EventLoop} run that will be processed 228 * before trying to handle IO again. 229 * @param args extra arguments passed to {@link #newChild(Executor, int, RejectedExecutionHandler, 230 * IoHandler, int, Object...)} 231 */ 232 protected MultithreadEventLoopGroup(int nThreads, Executor executor, 233 IoHandlerFactory ioHandlerFactory, 234 int maxPendingTasks, RejectedExecutionHandler rejectedHandler, 235 int maxTasksPerRun, Object... args) { 236 super(pickThreadCount(nThreads), 237 executor == null ? new ThreadPerTaskExecutor(newDefaultThreadFactory()) : executor, 238 maxPendingTasks, rejectedHandler, merge(ioHandlerFactory, maxTasksPerRun, args)); 239 } 240 241 /** 242 * Create a new instance. 243 * 244 * @param nThreads the number of threads that will be used by this instance. 245 * @param threadFactory the {@link ThreadFactory} to use, or {@code null} if the default should be used. 246 * @param ioHandlerFactory the {@link IoHandlerFactory} to use for creating new 247 * {@link IoHandler} instances that will handle the IO for the 248 * {@link EventLoop}. 249 * @param maxPendingTasks the maximum number of pending tasks before new tasks will be rejected. 250 * @param rejectedHandler the {@link RejectedExecutionHandler} to use. 251 * @param maxTasksPerRun the maximum number of tasks per {@link EventLoop} run that will be processed 252 * before trying to handle IO again. 253 * @param args extra arguments passed to {@link #newChild(Executor, int, RejectedExecutionHandler, 254 * IoHandler, int, Object...)} 255 */ 256 protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, 257 IoHandlerFactory ioHandlerFactory, 258 int maxPendingTasks, RejectedExecutionHandler rejectedHandler, 259 int maxTasksPerRun, Object... args) { 260 super(pickThreadCount(nThreads), threadFactory == null ? newDefaultThreadFactory() : threadFactory, 261 maxPendingTasks, rejectedHandler, merge(ioHandlerFactory, maxTasksPerRun, args)); 262 } 263 264 private static ThreadFactory newDefaultThreadFactory() { 265 return new DefaultThreadFactory(MultithreadEventLoopGroup.class, Thread.MAX_PRIORITY); 266 } 267 268 /** 269 * Return the number of threads to use based on the given {@code nThreads}. 270 */ 271 protected static int pickThreadCount(int nThreads) { 272 return nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads; 273 } 274 275 private static Object[] merge(IoHandlerFactory ioHandlerFactory, 276 int maxTasksPerRun, Object... args) { 277 List<Object> argList = new ArrayList<>(2 + args.length); 278 argList.add(ioHandlerFactory); 279 argList.add(maxTasksPerRun); 280 Collections.addAll(argList, args); 281 return argList.toArray(); 282 } 283 284 @Override 285 public final EventLoop next() { 286 return (EventLoop) super.next(); 287 } 288 289 @Override 290 protected final EventLoop newChild(Executor executor, int maxPendingTasks, 291 RejectedExecutionHandler rejectedExecutionHandler, Object... args) { 292 return newChild(executor, maxPendingTasks, rejectedExecutionHandler, 293 ((IoHandlerFactory) args[0]).newHandler(), (Integer) args[1], 294 Arrays.copyOfRange(args, 2, args.length)); 295 } 296 297 /** 298 * Creates a new {@link EventLoop} to use. 299 * 300 * As this method is called from within the constructor you can only use the parameters passed into the method when 301 * overriding this method. 302 * 303 * @param executor the {@link Executor} to use for execution. 304 * @param maxPendingTasks the maximum number of pending tasks. 305 * @param rejectedExecutionHandler the {@link RejectedExecutionHandler} to use when the number of outstanding tasks 306 * reach {@code maxPendingTasks}. 307 * @param ioHandler the {@link IoHandler} to use. 308 * @param maxTasksPerRun the maximum number of tasks per {@link EventLoop} run that will be processed 309 * before trying to handle IO again. 310 * @param args any extra args needed to construct the {@link EventLoop}. This will be an empty 311 * array if not sub-classes and extra arguments are given. 312 * @return the {@link EventLoop} to use. 313 */ 314 protected EventLoop newChild(Executor executor, int maxPendingTasks, 315 RejectedExecutionHandler rejectedExecutionHandler, 316 IoHandler ioHandler, int maxTasksPerRun, 317 Object... args) { 318 assert args.length == 0; 319 return new SingleThreadEventLoop(executor, ioHandler, maxPendingTasks, 320 rejectedExecutionHandler, maxTasksPerRun); 321 } 322 }