1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.util.concurrent;
17
18 import io.netty.util.concurrent.AutoScalingEventExecutorChooserFactory.AutoScalingUtilizationMetric;
19 import io.netty.util.concurrent.EventExecutorChooserFactory.ObservableEventExecutorChooser;
20
21 import java.util.Collections;
22 import java.util.Iterator;
23 import java.util.LinkedHashSet;
24 import java.util.List;
25 import java.util.Set;
26 import java.util.concurrent.Executor;
27 import java.util.concurrent.ThreadFactory;
28 import java.util.concurrent.TimeUnit;
29 import java.util.concurrent.atomic.AtomicInteger;
30
31 import static io.netty.util.internal.ObjectUtil.checkPositive;
32
33
34
35
36
37 public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
38
39 private final EventExecutor[] children;
40 private final Set<EventExecutor> readonlyChildren;
41 private final AtomicInteger terminatedChildren = new AtomicInteger();
42 private final Promise<?> terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);
43 private final EventExecutorChooserFactory.EventExecutorChooser chooser;
44
45
46
47
48
49
50
51
52 protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
53 this(nThreads, threadFactory == null ? null : new ThreadPerTaskExecutor(threadFactory), args);
54 }
55
56
57
58
59
60
61
62
63 protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
64 this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
65 }
66
67
68
69
70
71
72
73
74
75 protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
76 EventExecutorChooserFactory chooserFactory, Object... args) {
77 checkPositive(nThreads, "nThreads");
78
79 if (executor == null) {
80 executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
81 }
82
83 children = new EventExecutor[nThreads];
84
85 for (int i = 0; i < nThreads; i ++) {
86 boolean success = false;
87 try {
88 children[i] = newChild(executor, args);
89 success = true;
90 } catch (Exception e) {
91
92 throw new IllegalStateException("failed to create a child event loop", e);
93 } finally {
94 if (!success) {
95 for (int j = 0; j < i; j ++) {
96 children[j].shutdownGracefully();
97 }
98
99 for (int j = 0; j < i; j ++) {
100 EventExecutor e = children[j];
101 try {
102 while (!e.isTerminated()) {
103 e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
104 }
105 } catch (InterruptedException interrupted) {
106
107 Thread.currentThread().interrupt();
108 break;
109 }
110 }
111 }
112 }
113 }
114
115 chooser = chooserFactory.newChooser(children);
116
117 final FutureListener<Object> terminationListener = future -> {
118 if (terminatedChildren.incrementAndGet() == children.length) {
119 terminationFuture.setSuccess(null);
120 }
121 };
122
123 for (EventExecutor e: children) {
124 e.terminationFuture().addListener(terminationListener);
125 }
126
127 Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
128 Collections.addAll(childrenSet, children);
129 readonlyChildren = Collections.unmodifiableSet(childrenSet);
130 }
131
132 protected ThreadFactory newDefaultThreadFactory() {
133 return new DefaultThreadFactory(getClass());
134 }
135
136 @Override
137 public EventExecutor next() {
138 return chooser.next();
139 }
140
141 @Override
142 public Iterator<EventExecutor> iterator() {
143 return readonlyChildren.iterator();
144 }
145
146
147
148
149
150 public final int executorCount() {
151 return children.length;
152 }
153
154
155
156
157
158
159
160
161 public int activeExecutorCount() {
162 if (chooser instanceof ObservableEventExecutorChooser) {
163 return ((ObservableEventExecutorChooser) chooser).activeExecutorCount();
164 }
165 return executorCount();
166 }
167
168
169
170
171
172
173
174 public List<AutoScalingUtilizationMetric> executorUtilizations() {
175 if (chooser instanceof ObservableEventExecutorChooser) {
176 return ((ObservableEventExecutorChooser) chooser).executorUtilizations();
177 }
178 return Collections.emptyList();
179 }
180
181
182
183
184
185
186 protected abstract EventExecutor newChild(Executor executor, Object... args) throws Exception;
187
188 @Override
189 public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
190 for (EventExecutor l: children) {
191 l.shutdownGracefully(quietPeriod, timeout, unit);
192 }
193 return terminationFuture();
194 }
195
196 @Override
197 public Future<?> terminationFuture() {
198 return terminationFuture;
199 }
200
201 @Override
202 @Deprecated
203 public void shutdown() {
204 for (EventExecutor l: children) {
205 l.shutdown();
206 }
207 }
208
209 @Override
210 public boolean isShuttingDown() {
211 for (EventExecutor l: children) {
212 if (!l.isShuttingDown()) {
213 return false;
214 }
215 }
216 return true;
217 }
218
219 @Override
220 public boolean isShutdown() {
221 for (EventExecutor l: children) {
222 if (!l.isShutdown()) {
223 return false;
224 }
225 }
226 return true;
227 }
228
229 @Override
230 public boolean isTerminated() {
231 for (EventExecutor l: children) {
232 if (!l.isTerminated()) {
233 return false;
234 }
235 }
236 return true;
237 }
238
239 @Override
240 public boolean awaitTermination(long timeout, TimeUnit unit)
241 throws InterruptedException {
242 long deadline = System.nanoTime() + unit.toNanos(timeout);
243 loop: for (EventExecutor l: children) {
244 for (;;) {
245 long timeLeft = deadline - System.nanoTime();
246 if (timeLeft <= 0) {
247 break loop;
248 }
249 if (l.awaitTermination(timeLeft, TimeUnit.NANOSECONDS)) {
250 break;
251 }
252 }
253 }
254 return isTerminated();
255 }
256 }