View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty5.channel;
17  
18  import io.netty5.util.NettyRuntime;
19  import io.netty5.util.concurrent.DefaultThreadFactory;
20  import io.netty5.util.concurrent.MultithreadEventExecutorGroup;
21  import io.netty5.util.concurrent.RejectedExecutionHandler;
22  import io.netty5.util.concurrent.RejectedExecutionHandlers;
23  import io.netty5.util.concurrent.ThreadPerTaskExecutor;
24  import io.netty5.util.internal.EmptyArrays;
25  import io.netty5.util.internal.SystemPropertyUtil;
26  import io.netty5.util.internal.logging.InternalLogger;
27  import io.netty5.util.internal.logging.InternalLoggerFactory;
28  
29  import java.util.ArrayList;
30  import java.util.Arrays;
31  import java.util.Collections;
32  import java.util.List;
33  import java.util.concurrent.Executor;
34  import java.util.concurrent.ThreadFactory;
35  
36  /**
37   * {@link EventLoopGroup} implementation that will handle its tasks with multiple threads.
38   */
39  public class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {
40  
41      private static final InternalLogger logger = InternalLoggerFactory.getInstance(MultithreadEventLoopGroup.class);
42  
43      public static final int DEFAULT_EVENT_LOOP_THREADS;
44  
45      static {
46          DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
47                  "io.netty5.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
48  
49          if (logger.isDebugEnabled()) {
50              logger.debug("-Dio.netty5.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
51          }
52      }
53  
54      /**
55       * Create a new instance.
56       *
57       * @param ioHandlerFactory  the {@link IoHandlerFactory} to use for creating new
58       *                          {@link IoHandler} instances that will handle the IO for the
59       *                          {@link EventLoop}.
60       */
61      public MultithreadEventLoopGroup(IoHandlerFactory ioHandlerFactory) {
62          this(0, (Executor) null, ioHandlerFactory);
63      }
64  
65      /**
66       * Create a new instance.
67       *
68       * @param nThreads          the number of threads that will be used by this instance.
69       * @param ioHandlerFactory  the {@link IoHandlerFactory} to use for creating new
70       *                          {@link IoHandler} instances that will handle the IO for the
71       *                          {@link EventLoop}.
72       */
73      public MultithreadEventLoopGroup(int nThreads, IoHandlerFactory ioHandlerFactory) {
74          this(nThreads, (Executor) null, ioHandlerFactory);
75      }
76  
77      /**
78       * Create a new instance.
79       *
80       * @param nThreads          the number of threads that will be used by this instance.
81       * @param executor          the {@link Executor} to use, or {@code null} if the default should be used.
82       * @param ioHandlerFactory  the {@link IoHandlerFactory} to use for creating new
83       *                          {@link IoHandler} instances that will handle the IO for the
84       *                          {@link EventLoop}.
85       */
86      public MultithreadEventLoopGroup(int nThreads, Executor executor,
87                                       IoHandlerFactory ioHandlerFactory) {
88          this(nThreads, executor, ioHandlerFactory,
89                  SingleThreadEventLoop.DEFAULT_MAX_PENDING_TASKS, RejectedExecutionHandlers.reject(),
90                  SingleThreadEventLoop.DEFAULT_MAX_TASKS_PER_RUN);
91      }
92  
93      /**
94       * Create a new instance.
95       *
96       * @param nThreads          the number of threads that will be used by this instance.
97       * @param threadFactory     the {@link ThreadFactory} to use, or {@code null} if the default should be used.
98       * @param ioHandlerFactory  the {@link IoHandlerFactory} to use for creating new
99       *                          {@link IoHandler} instances that will handle the IO for the
100      *                          {@link EventLoop}.
101      */
102     public MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory,
103                                      IoHandlerFactory ioHandlerFactory) {
104         this(nThreads, threadFactory, ioHandlerFactory,
105                 SingleThreadEventLoop.DEFAULT_MAX_PENDING_TASKS, RejectedExecutionHandlers.reject());
106     }
107 
108     /**
109      * Create a new instance.
110      *
111      * @param executor          the {@link Executor} to use, or {@code null} if the default should be used.
112      * @param ioHandlerFactory  the {@link IoHandlerFactory} to use for creating new
113      *                          {@link IoHandler} instances that will handle the IO for the
114      *                          {@link EventLoop}.
115      */
116     public MultithreadEventLoopGroup(Executor executor,
117                                      IoHandlerFactory ioHandlerFactory) {
118         this(0, executor, ioHandlerFactory,
119                 SingleThreadEventLoop.DEFAULT_MAX_PENDING_TASKS, RejectedExecutionHandlers.reject(),
120                 SingleThreadEventLoop.DEFAULT_MAX_TASKS_PER_RUN);
121     }
122 
123     /**
124      * Create a new instance.
125      *
126      * @param threadFactory     the {@link ThreadFactory} to use, or {@code null} if the default should be used.
127      * @param ioHandlerFactory  the {@link IoHandlerFactory} to use for creating new
128      *                          {@link IoHandler} instances that will handle the IO for the
129      *                          {@link EventLoop}.
130      */
131     public MultithreadEventLoopGroup(ThreadFactory threadFactory,
132                                      IoHandlerFactory ioHandlerFactory) {
133         this(0, threadFactory, ioHandlerFactory,
134                 SingleThreadEventLoop.DEFAULT_MAX_PENDING_TASKS, RejectedExecutionHandlers.reject());
135     }
136 
137     /**
138      * Create a new instance.
139      *
140      * @param nThreads          the number of threads that will be used by this instance.
141      * @param executor          the {@link Executor} to use, or {@code null} if the default should be used.
142      * @param ioHandlerFactory  the {@link IoHandlerFactory} to use for creating new
143      *                          {@link IoHandler} instances that will handle the IO for the
144      *                          {@link EventLoop}.
145      * @param maxPendingTasks   the maximum number of pending tasks before new tasks will be rejected.
146      * @param rejectedHandler   the {@link RejectedExecutionHandler} to use.
147      */
148     public MultithreadEventLoopGroup(int nThreads, Executor executor,
149                                      IoHandlerFactory ioHandlerFactory,
150                                      int maxPendingTasks, RejectedExecutionHandler rejectedHandler) {
151         this(nThreads, executor, ioHandlerFactory, maxPendingTasks, rejectedHandler,
152                 SingleThreadEventLoop.DEFAULT_MAX_TASKS_PER_RUN);
153     }
154 
155     /**
156      * Create a new instance.
157      *
158      * @param nThreads          the number of threads that will be used by this instance.
159      * @param threadFactory     the {@link ThreadFactory} to use, or {@code null} if the default should be used.
160      * @param ioHandlerFactory  the {@link IoHandlerFactory} to use for creating new
161      *                          {@link IoHandler} instances that will handle the IO for the
162      *                          {@link EventLoop}.
163      * @param maxPendingTasks   the maximum number of pending tasks before new tasks will be rejected.
164      * @param rejectedHandler   the {@link RejectedExecutionHandler} to use.
165      */
166     public MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory,
167                                      IoHandlerFactory ioHandlerFactory,
168                                      int maxPendingTasks, RejectedExecutionHandler rejectedHandler) {
169         this(nThreads, threadFactory, ioHandlerFactory, maxPendingTasks, rejectedHandler,
170                 SingleThreadEventLoop.DEFAULT_MAX_TASKS_PER_RUN);
171     }
172 
173     /**
174      * Create a new instance.
175      *
176      * @param nThreads          the number of threads that will be used by this instance.
177      * @param executor          the {@link Executor} to use, or {@code null} if the default should be used.
178      * @param ioHandlerFactory  the {@link IoHandlerFactory} to use for creating new
179      *                          {@link IoHandler} instances that will handle the IO for the
180      *                          {@link EventLoop}.
181      * @param maxPendingTasks   the maximum number of pending tasks before new tasks will be rejected.
182      * @param rejectedHandler   the {@link RejectedExecutionHandler} to use.
183      * @param maxTasksPerRun    the maximum number of tasks per {@link EventLoop} run that will be processed
184      *                          before trying to handle IO again.
185      */
186     public MultithreadEventLoopGroup(int nThreads, Executor executor,
187                                      IoHandlerFactory ioHandlerFactory,
188                                      int maxPendingTasks, RejectedExecutionHandler rejectedHandler,
189                                      int maxTasksPerRun) {
190         this(nThreads, executor, ioHandlerFactory,
191                 maxPendingTasks, rejectedHandler, maxTasksPerRun, EmptyArrays.EMPTY_OBJECTS);
192     }
193 
194     /**
195      * Create a new instance.
196      *
197      * @param nThreads          the number of threads that will be used by this instance.
198      * @param threadFactory     the {@link ThreadFactory} to use, or {@code null} if the default should be used.
199      * @param ioHandlerFactory  the {@link IoHandlerFactory} to use for creating new
200      *                          {@link IoHandler} instances that will handle the IO for the
201      *                          {@link EventLoop}.
202      * @param maxPendingTasks   the maximum number of pending tasks before new tasks will be rejected.
203      * @param rejectedHandler   the {@link RejectedExecutionHandler} to use.
204      * @param maxTasksPerRun    the maximum number of tasks per {@link EventLoop} run that will be processed
205      *                          before trying to handle IO again.
206      */
207     public MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory,
208                                      IoHandlerFactory ioHandlerFactory,
209                                      int maxPendingTasks, RejectedExecutionHandler rejectedHandler,
210                                      int maxTasksPerRun) {
211         this(nThreads, threadFactory, ioHandlerFactory,
212                 maxPendingTasks, rejectedHandler, maxTasksPerRun, EmptyArrays.EMPTY_OBJECTS);
213     }
214 
215     // Constructors provided for sub-classes that want to pass more args to newChild(...).
216 
217     /**
218      * Create a new instance.
219      *
220      * @param nThreads          the number of threads that will be used by this instance.
221      * @param executor          the {@link Executor} to use, or {@code null} if the default should be used.
222      * @param ioHandlerFactory  the {@link IoHandlerFactory} to use for creating new
223      *                          {@link IoHandler} instances that will handle the IO for the
224      *                          {@link EventLoop}.
225      * @param maxPendingTasks   the maximum number of pending tasks before new tasks will be rejected.
226      * @param rejectedHandler   the {@link RejectedExecutionHandler} to use.
227      * @param maxTasksPerRun    the maximum number of tasks per {@link EventLoop} run that will be processed
228      *                          before trying to handle IO again.
229      * @param args              extra arguments passed to {@link #newChild(Executor, int, RejectedExecutionHandler,
230      *                          IoHandler, int, Object...)}
231      */
232     protected MultithreadEventLoopGroup(int nThreads, Executor executor,
233                                      IoHandlerFactory ioHandlerFactory,
234                                      int maxPendingTasks, RejectedExecutionHandler rejectedHandler,
235                                      int maxTasksPerRun, Object... args) {
236         super(pickThreadCount(nThreads),
237                 executor == null ? new ThreadPerTaskExecutor(newDefaultThreadFactory()) : executor,
238                 maxPendingTasks, rejectedHandler, merge(ioHandlerFactory, maxTasksPerRun, args));
239     }
240 
241     /**
242      * Create a new instance.
243      *
244      * @param nThreads          the number of threads that will be used by this instance.
245      * @param threadFactory     the {@link ThreadFactory} to use, or {@code null} if the default should be used.
246      * @param ioHandlerFactory  the {@link IoHandlerFactory} to use for creating new
247      *                          {@link IoHandler} instances that will handle the IO for the
248      *                          {@link EventLoop}.
249      * @param maxPendingTasks   the maximum number of pending tasks before new tasks will be rejected.
250      * @param rejectedHandler   the {@link RejectedExecutionHandler} to use.
251      * @param maxTasksPerRun    the maximum number of tasks per {@link EventLoop} run that will be processed
252      *                          before trying to handle IO again.
253      * @param args              extra arguments passed to {@link #newChild(Executor, int, RejectedExecutionHandler,
254      *                          IoHandler, int, Object...)}
255      */
256     protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory,
257                                      IoHandlerFactory ioHandlerFactory,
258                                      int maxPendingTasks, RejectedExecutionHandler rejectedHandler,
259                                      int maxTasksPerRun, Object... args) {
260         super(pickThreadCount(nThreads), threadFactory == null ? newDefaultThreadFactory() : threadFactory,
261                 maxPendingTasks, rejectedHandler, merge(ioHandlerFactory, maxTasksPerRun, args));
262     }
263 
264     private static ThreadFactory newDefaultThreadFactory() {
265         return new DefaultThreadFactory(MultithreadEventLoopGroup.class, Thread.MAX_PRIORITY);
266     }
267 
268     /**
269      * Return the number of threads to use based on the given {@code nThreads}.
270      */
271     protected static int pickThreadCount(int nThreads) {
272         return nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads;
273     }
274 
275     private static Object[] merge(IoHandlerFactory ioHandlerFactory,
276                                   int maxTasksPerRun, Object... args) {
277         List<Object> argList = new ArrayList<>(2 + args.length);
278         argList.add(ioHandlerFactory);
279         argList.add(maxTasksPerRun);
280         Collections.addAll(argList, args);
281         return argList.toArray();
282     }
283 
284     @Override
285     public final EventLoop next() {
286         return (EventLoop) super.next();
287     }
288 
289     @Override
290     protected final EventLoop newChild(Executor executor, int maxPendingTasks,
291                                        RejectedExecutionHandler rejectedExecutionHandler, Object... args) {
292         return newChild(executor, maxPendingTasks, rejectedExecutionHandler,
293                 ((IoHandlerFactory) args[0]).newHandler(), (Integer) args[1],
294                 Arrays.copyOfRange(args, 2, args.length));
295     }
296 
297     /**
298      * Creates a new {@link EventLoop} to use.
299      *
300      * As this method is called from within the constructor you can only use the parameters passed into the method when
301      * overriding this method.
302      *
303      * @param executor                  the {@link Executor} to use for execution.
304      * @param maxPendingTasks           the maximum number of pending tasks.
305      * @param rejectedExecutionHandler  the {@link RejectedExecutionHandler} to use when the number of outstanding tasks
306      *                                  reach {@code maxPendingTasks}.
307      * @param ioHandler                 the {@link IoHandler} to use.
308      * @param maxTasksPerRun            the maximum number of tasks per {@link EventLoop} run that will be processed
309      *                                  before trying to handle IO again.
310      * @param args                      any extra args needed to construct the {@link EventLoop}. This will be an empty
311      *                                  array if not sub-classes and extra arguments are given.
312      * @return                          the {@link EventLoop} to use.
313      */
314     protected EventLoop newChild(Executor executor, int maxPendingTasks,
315                                  RejectedExecutionHandler rejectedExecutionHandler,
316                                  IoHandler ioHandler, int maxTasksPerRun,
317                                  Object... args) {
318         assert args.length == 0;
319         return new SingleThreadEventLoop(executor, ioHandler, maxPendingTasks,
320                 rejectedExecutionHandler, maxTasksPerRun);
321     }
322 }