View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty5.util.concurrent;
17  
18  import io.netty5.util.internal.EmptyArrays;
19  
20  import java.util.Arrays;
21  import java.util.Collections;
22  import java.util.Iterator;
23  import java.util.List;
24  import java.util.concurrent.Executor;
25  import java.util.concurrent.ThreadFactory;
26  import java.util.concurrent.TimeUnit;
27  import java.util.concurrent.atomic.AtomicInteger;
28  import java.util.concurrent.atomic.AtomicLong;
29  
30  import static io.netty5.util.internal.ObjectUtil.checkPositive;
31  
32  /**
33   * {@link EventExecutorGroup} implementation that handles their tasks with multiple threads at
34   * the same time.
35   */
36  public class MultithreadEventExecutorGroup implements EventExecutorGroup {
37  
38      private final EventExecutor[] children;
39      private final List<EventExecutor> readonlyChildren;
40      private final AtomicInteger terminatedChildren = new AtomicInteger();
41      private final Promise<Void> terminationFuture = GlobalEventExecutor.INSTANCE.newPromise();
42      private final boolean powerOfTwo;
43  
44      /**
45       * Create a new instance.
46       *
47       * @param nThreads          the number of threads that will be used by this instance.
48       * @param threadFactory     the ThreadFactory to use, or {@code null} if the default should be used.
49       */
50      public MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory) {
51          this(nThreads, threadFactory, SingleThreadEventExecutor.DEFAULT_MAX_PENDING_EXECUTOR_TASKS,
52                  RejectedExecutionHandlers.reject());
53      }
54  
55      /**
56       * Create a new instance.
57       *
58       * @param nThreads          the number of threads that will be used by this instance.
59       * @param executor          the {@link Executor} to use, or {@code null} if the default should be used.
60       */
61      public MultithreadEventExecutorGroup(int nThreads, Executor executor) {
62          this(nThreads, executor, SingleThreadEventExecutor.DEFAULT_MAX_PENDING_EXECUTOR_TASKS,
63                  RejectedExecutionHandlers.reject());
64      }
65  
66      /**
67       * Create a new instance.
68       *
69       * @param nThreads          the number of threads that will be used by this instance.
70       * @param threadFactory     the {@link ThreadFactory} to use, or {@code null} if the default should be used.
71       * @param maxPendingTasks   the maximum number of pending tasks before new tasks will be rejected.
72       * @param rejectedHandler   the {@link RejectedExecutionHandler} to use.
73       */
74      public MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory,
75                                           int maxPendingTasks, RejectedExecutionHandler rejectedHandler) {
76          this(nThreads, threadFactory, maxPendingTasks, rejectedHandler, EmptyArrays.EMPTY_OBJECTS);
77      }
78  
79      /**
80       * Create a new instance.
81       *
82       * @param nThreads          the number of threads that will be used by this instance.
83       * @param executor          the Executor to use, or {@code null} if the default should be used.
84       * @param maxPendingTasks   the maximum number of pending tasks before new tasks will be rejected.
85       * @param rejectedHandler   the {@link RejectedExecutionHandler} to use.
86       */
87      public MultithreadEventExecutorGroup(int nThreads, Executor executor,
88                                           int maxPendingTasks, RejectedExecutionHandler rejectedHandler) {
89          this(nThreads, executor, maxPendingTasks, rejectedHandler, EmptyArrays.EMPTY_OBJECTS);
90      }
91  
92      /**
93       * Create a new instance.
94       *
95       * @param nThreads          the number of threads that will be used by this instance.
96       * @param threadFactory     the ThreadFactory to use, or {@code null} if the default should be used.
97       * @param maxPendingTasks   the maximum number of pending tasks before new tasks will be rejected.
98       * @param rejectedHandler   the {@link RejectedExecutionHandler} to use.
99       * @param args              arguments which will passed to each {@link #newChild(Executor, int,
100      * RejectedExecutionHandler, Object...)} call
101      */
102     protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, int maxPendingTasks,
103                                             RejectedExecutionHandler rejectedHandler, Object... args) {
104         this(nThreads, threadFactory == null ? null : new ThreadPerTaskExecutor(threadFactory),
105                 maxPendingTasks, rejectedHandler, args);
106     }
107 
108     /**
109      * Create a new instance.
110      *
111      * @param nThreads          the number of threads that will be used by this instance.
112      * @param executor          the Executor to use, or {@code null} if the default should be used.
113      * @param maxPendingTasks   the maximum number of pending tasks before new tasks will be rejected.
114      * @param rejectedHandler   the {@link RejectedExecutionHandler} to use.
115      * @param args              arguments which will passed to each {@link #newChild(Executor, int,
116      * RejectedExecutionHandler, Object...)} call
117      */
118     protected MultithreadEventExecutorGroup(int nThreads, Executor executor, int maxPendingTasks,
119                                             RejectedExecutionHandler rejectedHandler, Object... args) {
120         checkPositive(nThreads, "nThreads");
121 
122         if (executor == null) {
123             executor = new ThreadPerTaskExecutor(new DefaultThreadFactory(getClass()));
124         }
125 
126         children = new EventExecutor[nThreads];
127         powerOfTwo = isPowerOfTwo(children.length);
128         for (int i = 0; i < nThreads; i ++) {
129             boolean success = false;
130             try {
131                 children[i] = newChild(executor, maxPendingTasks, rejectedHandler, args);
132                 success = true;
133             } catch (Exception e) {
134                 // TODO: Think about if this is a good exception type
135                 throw new IllegalStateException("failed to create a child event executor", e);
136             } finally {
137                 if (!success) {
138                     for (int j = 0; j < i; j ++) {
139                         children[j].shutdownGracefully();
140                     }
141 
142                     for (int j = 0; j < i; j ++) {
143                         EventExecutor e = children[j];
144                         try {
145                             while (!e.isTerminated()) {
146                                 e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
147                             }
148                         } catch (InterruptedException interrupted) {
149                             // Let the caller handle the interruption.
150                             Thread.currentThread().interrupt();
151                             break;
152                         }
153                     }
154                 }
155             }
156         }
157 
158         final FutureListener<Object> terminationListener = future -> {
159             if (terminatedChildren.incrementAndGet() == children.length) {
160                 terminationFuture.setSuccess(null);
161             }
162         };
163 
164         for (EventExecutor e: children) {
165             e.terminationFuture().addListener(terminationListener);
166         }
167         readonlyChildren = Collections.unmodifiableList(Arrays.asList(children));
168     }
169 
170     // Use a 'long' counter to avoid non-round-robin behaviour at the 32-bit overflow boundary.
171     // The 64-bit long solves this by placing the overflow so far into the future, that no system
172     // will encounter this in practice.
173     private final AtomicLong idx = new AtomicLong();
174 
175     /**
176      * The {@link EventExecutor}s that are used by this {@link MultithreadEventExecutorGroup}.
177      */
178     protected final List<EventExecutor> executors() {
179         return readonlyChildren;
180     }
181 
182     /**
183      * Returns the next {@link EventExecutor} to use. The default implementation will use round-robin, but you may
184      * override this to change the selection algorithm.
185      */
186     @Override
187     public EventExecutor next() {
188         if (powerOfTwo) {
189             return children[(int) idx.getAndIncrement() & children.length - 1];
190         }
191         return children[(int) Math.abs(idx.getAndIncrement() % children.length)];
192     }
193 
194     private static boolean isPowerOfTwo(int val) {
195         return (val & -val) == val;
196     }
197 
198     @Override
199     public Iterator<EventExecutor> iterator() {
200         return executors().iterator();
201     }
202 
203     /**
204      * Return the number of {@link EventExecutor} this implementation uses. This number is the maps
205      * 1:1 to the threads it use.
206      */
207     public final int executorCount() {
208         return executors().size();
209     }
210 
211     /**
212      * Create a new EventExecutor which will later then accessible via the {@link #next()} method. This method will be
213      * called for each thread that will serve this {@link MultithreadEventExecutorGroup}.
214      *
215      * As this method is called from within the constructor you can only use the parameters passed into the method when
216      * overriding this method.
217      */
218     protected EventExecutor newChild(Executor executor,  int maxPendingTasks,
219                                      RejectedExecutionHandler rejectedExecutionHandler,
220                                      Object... args) {
221         assert args.length == 0;
222         return new SingleThreadEventExecutor(executor, maxPendingTasks, rejectedExecutionHandler);
223     }
224 
225     @Override
226     public final Future<Void> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
227         for (EventExecutor l: children) {
228             l.shutdownGracefully(quietPeriod, timeout, unit);
229         }
230         return terminationFuture();
231     }
232 
233     @Override
234     public final Future<Void> terminationFuture() {
235         return terminationFuture.asFuture();
236     }
237 
238     @Override
239     public final boolean isShuttingDown() {
240         for (EventExecutor l: children) {
241             if (!l.isShuttingDown()) {
242                 return false;
243             }
244         }
245         return true;
246     }
247 
248     @Override
249     public final boolean isShutdown() {
250         for (EventExecutor l: children) {
251             if (!l.isShutdown()) {
252                 return false;
253             }
254         }
255         return true;
256     }
257 
258     @Override
259     public final boolean isTerminated() {
260         for (EventExecutor l: children) {
261             if (!l.isTerminated()) {
262                 return false;
263             }
264         }
265         return true;
266     }
267 
268     @Override
269     public final boolean awaitTermination(long timeout, TimeUnit unit)
270             throws InterruptedException {
271         long deadline = System.nanoTime() + unit.toNanos(timeout);
272         loop: for (EventExecutor l: children) {
273             for (;;) {
274                 long timeLeft = deadline - System.nanoTime();
275                 if (timeLeft <= 0) {
276                     break loop;
277                 }
278                 if (l.awaitTermination(timeLeft, TimeUnit.NANOSECONDS)) {
279                     break;
280                 }
281             }
282         }
283         return isTerminated();
284     }
285 }