1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty5.util.concurrent;
17
18 import io.netty5.util.internal.EmptyArrays;
19
20 import java.util.Arrays;
21 import java.util.Collections;
22 import java.util.Iterator;
23 import java.util.List;
24 import java.util.concurrent.Executor;
25 import java.util.concurrent.ThreadFactory;
26 import java.util.concurrent.TimeUnit;
27 import java.util.concurrent.atomic.AtomicInteger;
28 import java.util.concurrent.atomic.AtomicLong;
29
30 import static io.netty5.util.internal.ObjectUtil.checkPositive;
31
32
33
34
35
36 public class MultithreadEventExecutorGroup implements EventExecutorGroup {
37
38 private final EventExecutor[] children;
39 private final List<EventExecutor> readonlyChildren;
40 private final AtomicInteger terminatedChildren = new AtomicInteger();
41 private final Promise<Void> terminationFuture = GlobalEventExecutor.INSTANCE.newPromise();
42 private final boolean powerOfTwo;
43
44
45
46
47
48
49
50 public MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory) {
51 this(nThreads, threadFactory, SingleThreadEventExecutor.DEFAULT_MAX_PENDING_EXECUTOR_TASKS,
52 RejectedExecutionHandlers.reject());
53 }
54
55
56
57
58
59
60
61 public MultithreadEventExecutorGroup(int nThreads, Executor executor) {
62 this(nThreads, executor, SingleThreadEventExecutor.DEFAULT_MAX_PENDING_EXECUTOR_TASKS,
63 RejectedExecutionHandlers.reject());
64 }
65
66
67
68
69
70
71
72
73
74 public MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory,
75 int maxPendingTasks, RejectedExecutionHandler rejectedHandler) {
76 this(nThreads, threadFactory, maxPendingTasks, rejectedHandler, EmptyArrays.EMPTY_OBJECTS);
77 }
78
79
80
81
82
83
84
85
86
87 public MultithreadEventExecutorGroup(int nThreads, Executor executor,
88 int maxPendingTasks, RejectedExecutionHandler rejectedHandler) {
89 this(nThreads, executor, maxPendingTasks, rejectedHandler, EmptyArrays.EMPTY_OBJECTS);
90 }
91
92
93
94
95
96
97
98
99
100
101
102 protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, int maxPendingTasks,
103 RejectedExecutionHandler rejectedHandler, Object... args) {
104 this(nThreads, threadFactory == null ? null : new ThreadPerTaskExecutor(threadFactory),
105 maxPendingTasks, rejectedHandler, args);
106 }
107
108
109
110
111
112
113
114
115
116
117
118 protected MultithreadEventExecutorGroup(int nThreads, Executor executor, int maxPendingTasks,
119 RejectedExecutionHandler rejectedHandler, Object... args) {
120 checkPositive(nThreads, "nThreads");
121
122 if (executor == null) {
123 executor = new ThreadPerTaskExecutor(new DefaultThreadFactory(getClass()));
124 }
125
126 children = new EventExecutor[nThreads];
127 powerOfTwo = isPowerOfTwo(children.length);
128 for (int i = 0; i < nThreads; i ++) {
129 boolean success = false;
130 try {
131 children[i] = newChild(executor, maxPendingTasks, rejectedHandler, args);
132 success = true;
133 } catch (Exception e) {
134
135 throw new IllegalStateException("failed to create a child event executor", e);
136 } finally {
137 if (!success) {
138 for (int j = 0; j < i; j ++) {
139 children[j].shutdownGracefully();
140 }
141
142 for (int j = 0; j < i; j ++) {
143 EventExecutor e = children[j];
144 try {
145 while (!e.isTerminated()) {
146 e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
147 }
148 } catch (InterruptedException interrupted) {
149
150 Thread.currentThread().interrupt();
151 break;
152 }
153 }
154 }
155 }
156 }
157
158 final FutureListener<Object> terminationListener = future -> {
159 if (terminatedChildren.incrementAndGet() == children.length) {
160 terminationFuture.setSuccess(null);
161 }
162 };
163
164 for (EventExecutor e: children) {
165 e.terminationFuture().addListener(terminationListener);
166 }
167 readonlyChildren = Collections.unmodifiableList(Arrays.asList(children));
168 }
169
170
171
172
173 private final AtomicLong idx = new AtomicLong();
174
175
176
177
178 protected final List<EventExecutor> executors() {
179 return readonlyChildren;
180 }
181
182
183
184
185
186 @Override
187 public EventExecutor next() {
188 if (powerOfTwo) {
189 return children[(int) idx.getAndIncrement() & children.length - 1];
190 }
191 return children[(int) Math.abs(idx.getAndIncrement() % children.length)];
192 }
193
194 private static boolean isPowerOfTwo(int val) {
195 return (val & -val) == val;
196 }
197
198 @Override
199 public Iterator<EventExecutor> iterator() {
200 return executors().iterator();
201 }
202
203
204
205
206
207 public final int executorCount() {
208 return executors().size();
209 }
210
211
212
213
214
215
216
217
218 protected EventExecutor newChild(Executor executor, int maxPendingTasks,
219 RejectedExecutionHandler rejectedExecutionHandler,
220 Object... args) {
221 assert args.length == 0;
222 return new SingleThreadEventExecutor(executor, maxPendingTasks, rejectedExecutionHandler);
223 }
224
225 @Override
226 public final Future<Void> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
227 for (EventExecutor l: children) {
228 l.shutdownGracefully(quietPeriod, timeout, unit);
229 }
230 return terminationFuture();
231 }
232
233 @Override
234 public final Future<Void> terminationFuture() {
235 return terminationFuture.asFuture();
236 }
237
238 @Override
239 public final boolean isShuttingDown() {
240 for (EventExecutor l: children) {
241 if (!l.isShuttingDown()) {
242 return false;
243 }
244 }
245 return true;
246 }
247
248 @Override
249 public final boolean isShutdown() {
250 for (EventExecutor l: children) {
251 if (!l.isShutdown()) {
252 return false;
253 }
254 }
255 return true;
256 }
257
258 @Override
259 public final boolean isTerminated() {
260 for (EventExecutor l: children) {
261 if (!l.isTerminated()) {
262 return false;
263 }
264 }
265 return true;
266 }
267
268 @Override
269 public final boolean awaitTermination(long timeout, TimeUnit unit)
270 throws InterruptedException {
271 long deadline = System.nanoTime() + unit.toNanos(timeout);
272 loop: for (EventExecutor l: children) {
273 for (;;) {
274 long timeLeft = deadline - System.nanoTime();
275 if (timeLeft <= 0) {
276 break loop;
277 }
278 if (l.awaitTermination(timeLeft, TimeUnit.NANOSECONDS)) {
279 break;
280 }
281 }
282 }
283 return isTerminated();
284 }
285 }