1 /*
2 * Copyright 2012 The Netty Project
3 *
4 * The Netty Project licenses this file to you under the Apache License,
5 * version 2.0 (the "License"); you may not use this file except in compliance
6 * with the License. You may obtain a copy of the License at:
7 *
8 * https://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations
14 * under the License.
15 */
16 package io.netty5.channel;
17
18 import io.netty5.util.NettyRuntime;
19 import io.netty5.util.concurrent.DefaultThreadFactory;
20 import io.netty5.util.concurrent.MultithreadEventExecutorGroup;
21 import io.netty5.util.concurrent.RejectedExecutionHandler;
22 import io.netty5.util.concurrent.RejectedExecutionHandlers;
23 import io.netty5.util.concurrent.ThreadPerTaskExecutor;
24 import io.netty5.util.internal.EmptyArrays;
25 import io.netty5.util.internal.SystemPropertyUtil;
26 import io.netty5.util.internal.logging.InternalLogger;
27 import io.netty5.util.internal.logging.InternalLoggerFactory;
28
29 import java.util.ArrayList;
30 import java.util.Arrays;
31 import java.util.Collections;
32 import java.util.List;
33 import java.util.concurrent.Executor;
34 import java.util.concurrent.ThreadFactory;
35
36 /**
37 * {@link EventLoopGroup} implementation that will handle its tasks with multiple threads.
38 */
39 public class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {
40
41 private static final InternalLogger logger = InternalLoggerFactory.getInstance(MultithreadEventLoopGroup.class);
42
43 public static final int DEFAULT_EVENT_LOOP_THREADS;
44
45 static {
46 DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
47 "io.netty5.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
48
49 if (logger.isDebugEnabled()) {
50 logger.debug("-Dio.netty5.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
51 }
52 }
53
54 /**
55 * Create a new instance.
56 *
57 * @param ioHandlerFactory the {@link IoHandlerFactory} to use for creating new
58 * {@link IoHandler} instances that will handle the IO for the
59 * {@link EventLoop}.
60 */
61 public MultithreadEventLoopGroup(IoHandlerFactory ioHandlerFactory) {
62 this(0, (Executor) null, ioHandlerFactory);
63 }
64
65 /**
66 * Create a new instance.
67 *
68 * @param nThreads the number of threads that will be used by this instance.
69 * @param ioHandlerFactory the {@link IoHandlerFactory} to use for creating new
70 * {@link IoHandler} instances that will handle the IO for the
71 * {@link EventLoop}.
72 */
73 public MultithreadEventLoopGroup(int nThreads, IoHandlerFactory ioHandlerFactory) {
74 this(nThreads, (Executor) null, ioHandlerFactory);
75 }
76
77 /**
78 * Create a new instance.
79 *
80 * @param nThreads the number of threads that will be used by this instance.
81 * @param executor the {@link Executor} to use, or {@code null} if the default should be used.
82 * @param ioHandlerFactory the {@link IoHandlerFactory} to use for creating new
83 * {@link IoHandler} instances that will handle the IO for the
84 * {@link EventLoop}.
85 */
86 public MultithreadEventLoopGroup(int nThreads, Executor executor,
87 IoHandlerFactory ioHandlerFactory) {
88 this(nThreads, executor, ioHandlerFactory,
89 SingleThreadEventLoop.DEFAULT_MAX_PENDING_TASKS, RejectedExecutionHandlers.reject(),
90 SingleThreadEventLoop.DEFAULT_MAX_TASKS_PER_RUN);
91 }
92
93 /**
94 * Create a new instance.
95 *
96 * @param nThreads the number of threads that will be used by this instance.
97 * @param threadFactory the {@link ThreadFactory} to use, or {@code null} if the default should be used.
98 * @param ioHandlerFactory the {@link IoHandlerFactory} to use for creating new
99 * {@link IoHandler} instances that will handle the IO for the
100 * {@link EventLoop}.
101 */
102 public MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory,
103 IoHandlerFactory ioHandlerFactory) {
104 this(nThreads, threadFactory, ioHandlerFactory,
105 SingleThreadEventLoop.DEFAULT_MAX_PENDING_TASKS, RejectedExecutionHandlers.reject());
106 }
107
108 /**
109 * Create a new instance.
110 *
111 * @param executor the {@link Executor} to use, or {@code null} if the default should be used.
112 * @param ioHandlerFactory the {@link IoHandlerFactory} to use for creating new
113 * {@link IoHandler} instances that will handle the IO for the
114 * {@link EventLoop}.
115 */
116 public MultithreadEventLoopGroup(Executor executor,
117 IoHandlerFactory ioHandlerFactory) {
118 this(0, executor, ioHandlerFactory,
119 SingleThreadEventLoop.DEFAULT_MAX_PENDING_TASKS, RejectedExecutionHandlers.reject(),
120 SingleThreadEventLoop.DEFAULT_MAX_TASKS_PER_RUN);
121 }
122
123 /**
124 * Create a new instance.
125 *
126 * @param threadFactory the {@link ThreadFactory} to use, or {@code null} if the default should be used.
127 * @param ioHandlerFactory the {@link IoHandlerFactory} to use for creating new
128 * {@link IoHandler} instances that will handle the IO for the
129 * {@link EventLoop}.
130 */
131 public MultithreadEventLoopGroup(ThreadFactory threadFactory,
132 IoHandlerFactory ioHandlerFactory) {
133 this(0, threadFactory, ioHandlerFactory,
134 SingleThreadEventLoop.DEFAULT_MAX_PENDING_TASKS, RejectedExecutionHandlers.reject());
135 }
136
137 /**
138 * Create a new instance.
139 *
140 * @param nThreads the number of threads that will be used by this instance.
141 * @param executor the {@link Executor} to use, or {@code null} if the default should be used.
142 * @param ioHandlerFactory the {@link IoHandlerFactory} to use for creating new
143 * {@link IoHandler} instances that will handle the IO for the
144 * {@link EventLoop}.
145 * @param maxPendingTasks the maximum number of pending tasks before new tasks will be rejected.
146 * @param rejectedHandler the {@link RejectedExecutionHandler} to use.
147 */
148 public MultithreadEventLoopGroup(int nThreads, Executor executor,
149 IoHandlerFactory ioHandlerFactory,
150 int maxPendingTasks, RejectedExecutionHandler rejectedHandler) {
151 this(nThreads, executor, ioHandlerFactory, maxPendingTasks, rejectedHandler,
152 SingleThreadEventLoop.DEFAULT_MAX_TASKS_PER_RUN);
153 }
154
155 /**
156 * Create a new instance.
157 *
158 * @param nThreads the number of threads that will be used by this instance.
159 * @param threadFactory the {@link ThreadFactory} to use, or {@code null} if the default should be used.
160 * @param ioHandlerFactory the {@link IoHandlerFactory} to use for creating new
161 * {@link IoHandler} instances that will handle the IO for the
162 * {@link EventLoop}.
163 * @param maxPendingTasks the maximum number of pending tasks before new tasks will be rejected.
164 * @param rejectedHandler the {@link RejectedExecutionHandler} to use.
165 */
166 public MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory,
167 IoHandlerFactory ioHandlerFactory,
168 int maxPendingTasks, RejectedExecutionHandler rejectedHandler) {
169 this(nThreads, threadFactory, ioHandlerFactory, maxPendingTasks, rejectedHandler,
170 SingleThreadEventLoop.DEFAULT_MAX_TASKS_PER_RUN);
171 }
172
173 /**
174 * Create a new instance.
175 *
176 * @param nThreads the number of threads that will be used by this instance.
177 * @param executor the {@link Executor} to use, or {@code null} if the default should be used.
178 * @param ioHandlerFactory the {@link IoHandlerFactory} to use for creating new
179 * {@link IoHandler} instances that will handle the IO for the
180 * {@link EventLoop}.
181 * @param maxPendingTasks the maximum number of pending tasks before new tasks will be rejected.
182 * @param rejectedHandler the {@link RejectedExecutionHandler} to use.
183 * @param maxTasksPerRun the maximum number of tasks per {@link EventLoop} run that will be processed
184 * before trying to handle IO again.
185 */
186 public MultithreadEventLoopGroup(int nThreads, Executor executor,
187 IoHandlerFactory ioHandlerFactory,
188 int maxPendingTasks, RejectedExecutionHandler rejectedHandler,
189 int maxTasksPerRun) {
190 this(nThreads, executor, ioHandlerFactory,
191 maxPendingTasks, rejectedHandler, maxTasksPerRun, EmptyArrays.EMPTY_OBJECTS);
192 }
193
194 /**
195 * Create a new instance.
196 *
197 * @param nThreads the number of threads that will be used by this instance.
198 * @param threadFactory the {@link ThreadFactory} to use, or {@code null} if the default should be used.
199 * @param ioHandlerFactory the {@link IoHandlerFactory} to use for creating new
200 * {@link IoHandler} instances that will handle the IO for the
201 * {@link EventLoop}.
202 * @param maxPendingTasks the maximum number of pending tasks before new tasks will be rejected.
203 * @param rejectedHandler the {@link RejectedExecutionHandler} to use.
204 * @param maxTasksPerRun the maximum number of tasks per {@link EventLoop} run that will be processed
205 * before trying to handle IO again.
206 */
207 public MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory,
208 IoHandlerFactory ioHandlerFactory,
209 int maxPendingTasks, RejectedExecutionHandler rejectedHandler,
210 int maxTasksPerRun) {
211 this(nThreads, threadFactory, ioHandlerFactory,
212 maxPendingTasks, rejectedHandler, maxTasksPerRun, EmptyArrays.EMPTY_OBJECTS);
213 }
214
215 // Constructors provided for sub-classes that want to pass more args to newChild(...).
216
217 /**
218 * Create a new instance.
219 *
220 * @param nThreads the number of threads that will be used by this instance.
221 * @param executor the {@link Executor} to use, or {@code null} if the default should be used.
222 * @param ioHandlerFactory the {@link IoHandlerFactory} to use for creating new
223 * {@link IoHandler} instances that will handle the IO for the
224 * {@link EventLoop}.
225 * @param maxPendingTasks the maximum number of pending tasks before new tasks will be rejected.
226 * @param rejectedHandler the {@link RejectedExecutionHandler} to use.
227 * @param maxTasksPerRun the maximum number of tasks per {@link EventLoop} run that will be processed
228 * before trying to handle IO again.
229 * @param args extra arguments passed to {@link #newChild(Executor, int, RejectedExecutionHandler,
230 * IoHandler, int, Object...)}
231 */
232 protected MultithreadEventLoopGroup(int nThreads, Executor executor,
233 IoHandlerFactory ioHandlerFactory,
234 int maxPendingTasks, RejectedExecutionHandler rejectedHandler,
235 int maxTasksPerRun, Object... args) {
236 super(pickThreadCount(nThreads),
237 executor == null ? new ThreadPerTaskExecutor(newDefaultThreadFactory()) : executor,
238 maxPendingTasks, rejectedHandler, merge(ioHandlerFactory, maxTasksPerRun, args));
239 }
240
241 /**
242 * Create a new instance.
243 *
244 * @param nThreads the number of threads that will be used by this instance.
245 * @param threadFactory the {@link ThreadFactory} to use, or {@code null} if the default should be used.
246 * @param ioHandlerFactory the {@link IoHandlerFactory} to use for creating new
247 * {@link IoHandler} instances that will handle the IO for the
248 * {@link EventLoop}.
249 * @param maxPendingTasks the maximum number of pending tasks before new tasks will be rejected.
250 * @param rejectedHandler the {@link RejectedExecutionHandler} to use.
251 * @param maxTasksPerRun the maximum number of tasks per {@link EventLoop} run that will be processed
252 * before trying to handle IO again.
253 * @param args extra arguments passed to {@link #newChild(Executor, int, RejectedExecutionHandler,
254 * IoHandler, int, Object...)}
255 */
256 protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory,
257 IoHandlerFactory ioHandlerFactory,
258 int maxPendingTasks, RejectedExecutionHandler rejectedHandler,
259 int maxTasksPerRun, Object... args) {
260 super(pickThreadCount(nThreads), threadFactory == null ? newDefaultThreadFactory() : threadFactory,
261 maxPendingTasks, rejectedHandler, merge(ioHandlerFactory, maxTasksPerRun, args));
262 }
263
264 private static ThreadFactory newDefaultThreadFactory() {
265 return new DefaultThreadFactory(MultithreadEventLoopGroup.class, Thread.MAX_PRIORITY);
266 }
267
268 /**
269 * Return the number of threads to use based on the given {@code nThreads}.
270 */
271 protected static int pickThreadCount(int nThreads) {
272 return nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads;
273 }
274
275 private static Object[] merge(IoHandlerFactory ioHandlerFactory,
276 int maxTasksPerRun, Object... args) {
277 List<Object> argList = new ArrayList<>(2 + args.length);
278 argList.add(ioHandlerFactory);
279 argList.add(maxTasksPerRun);
280 Collections.addAll(argList, args);
281 return argList.toArray();
282 }
283
284 @Override
285 public final EventLoop next() {
286 return (EventLoop) super.next();
287 }
288
289 @Override
290 protected final EventLoop newChild(Executor executor, int maxPendingTasks,
291 RejectedExecutionHandler rejectedExecutionHandler, Object... args) {
292 return newChild(executor, maxPendingTasks, rejectedExecutionHandler,
293 ((IoHandlerFactory) args[0]).newHandler(), (Integer) args[1],
294 Arrays.copyOfRange(args, 2, args.length));
295 }
296
297 /**
298 * Creates a new {@link EventLoop} to use.
299 *
300 * As this method is called from within the constructor you can only use the parameters passed into the method when
301 * overriding this method.
302 *
303 * @param executor the {@link Executor} to use for execution.
304 * @param maxPendingTasks the maximum number of pending tasks.
305 * @param rejectedExecutionHandler the {@link RejectedExecutionHandler} to use when the number of outstanding tasks
306 * reach {@code maxPendingTasks}.
307 * @param ioHandler the {@link IoHandler} to use.
308 * @param maxTasksPerRun the maximum number of tasks per {@link EventLoop} run that will be processed
309 * before trying to handle IO again.
310 * @param args any extra args needed to construct the {@link EventLoop}. This will be an empty
311 * array if not sub-classes and extra arguments are given.
312 * @return the {@link EventLoop} to use.
313 */
314 protected EventLoop newChild(Executor executor, int maxPendingTasks,
315 RejectedExecutionHandler rejectedExecutionHandler,
316 IoHandler ioHandler, int maxTasksPerRun,
317 Object... args) {
318 assert args.length == 0;
319 return new SingleThreadEventLoop(executor, ioHandler, maxPendingTasks,
320 rejectedExecutionHandler, maxTasksPerRun);
321 }
322 }