1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.util.concurrent;
17
18 import io.netty.util.concurrent.AutoScalingEventExecutorChooserFactory.AutoScalingUtilizationMetric;
19 import io.netty.util.concurrent.EventExecutorChooserFactory.ObservableEventExecutorChooser;
20
21 import java.util.Collections;
22 import java.util.Iterator;
23 import java.util.LinkedHashSet;
24 import java.util.List;
25 import java.util.Set;
26 import java.util.concurrent.Executor;
27 import java.util.concurrent.ThreadFactory;
28 import java.util.concurrent.TimeUnit;
29 import java.util.concurrent.atomic.AtomicInteger;
30
31 import static io.netty.util.internal.ObjectUtil.checkPositive;
32
33
34
35
36
37 public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
38
39 private final EventExecutor[] children;
40 private final Set<EventExecutor> readonlyChildren;
41 private final AtomicInteger terminatedChildren = new AtomicInteger();
42 private final Promise<?> terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);
43 private final EventExecutorChooserFactory.EventExecutorChooser chooser;
44
45
46
47
48
49
50
51
52 protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
53 this(nThreads, threadFactory == null ? null : new ThreadPerTaskExecutor(threadFactory), args);
54 }
55
56
57
58
59
60
61
62
63 protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
64 this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
65 }
66
67
68
69
70
71
72
73
74
75 protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
76 EventExecutorChooserFactory chooserFactory, Object... args) {
77 checkPositive(nThreads, "nThreads");
78
79 if (executor == null) {
80 executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
81 }
82
83 children = new EventExecutor[nThreads];
84
85 for (int i = 0; i < nThreads; i ++) {
86 boolean success = false;
87 try {
88 children[i] = newChild(executor, args);
89 success = true;
90 } catch (Exception e) {
91
92 throw new IllegalStateException("failed to create a child event loop", e);
93 } finally {
94 if (!success) {
95 for (int j = 0; j < i; j ++) {
96 children[j].shutdownGracefully();
97 }
98
99 for (int j = 0; j < i; j ++) {
100 EventExecutor e = children[j];
101 try {
102 while (!e.isTerminated()) {
103 e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
104 }
105 } catch (InterruptedException interrupted) {
106
107 Thread.currentThread().interrupt();
108 break;
109 }
110 }
111 }
112 }
113 }
114
115 chooser = chooserFactory.newChooser(children);
116
117 final FutureListener<Object> terminationListener = new FutureListener<Object>() {
118 @Override
119 public void operationComplete(Future<Object> future) throws Exception {
120 if (terminatedChildren.incrementAndGet() == children.length) {
121 terminationFuture.setSuccess(null);
122 }
123 }
124 };
125
126 for (EventExecutor e: children) {
127 e.terminationFuture().addListener(terminationListener);
128 }
129
130 Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
131 Collections.addAll(childrenSet, children);
132 readonlyChildren = Collections.unmodifiableSet(childrenSet);
133 }
134
135 protected ThreadFactory newDefaultThreadFactory() {
136 return new DefaultThreadFactory(getClass());
137 }
138
139 @Override
140 public EventExecutor next() {
141 return chooser.next();
142 }
143
144 @Override
145 public Iterator<EventExecutor> iterator() {
146 return readonlyChildren.iterator();
147 }
148
149
150
151
152
153 public final int executorCount() {
154 return children.length;
155 }
156
157
158
159
160
161
162
163
164 public int activeExecutorCount() {
165 if (chooser instanceof ObservableEventExecutorChooser) {
166 return ((ObservableEventExecutorChooser) chooser).activeExecutorCount();
167 }
168 return executorCount();
169 }
170
171
172
173
174
175
176
177 public List<AutoScalingUtilizationMetric> executorUtilizations() {
178 if (chooser instanceof ObservableEventExecutorChooser) {
179 return ((ObservableEventExecutorChooser) chooser).executorUtilizations();
180 }
181 return Collections.emptyList();
182 }
183
184
185
186
187
188
189 protected abstract EventExecutor newChild(Executor executor, Object... args) throws Exception;
190
191 @Override
192 public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
193 for (EventExecutor l: children) {
194 l.shutdownGracefully(quietPeriod, timeout, unit);
195 }
196 return terminationFuture();
197 }
198
199 @Override
200 public Future<?> terminationFuture() {
201 return terminationFuture;
202 }
203
204 @Override
205 @Deprecated
206 public void shutdown() {
207 for (EventExecutor l: children) {
208 l.shutdown();
209 }
210 }
211
212 @Override
213 public boolean isShuttingDown() {
214 for (EventExecutor l: children) {
215 if (!l.isShuttingDown()) {
216 return false;
217 }
218 }
219 return true;
220 }
221
222 @Override
223 public boolean isShutdown() {
224 for (EventExecutor l: children) {
225 if (!l.isShutdown()) {
226 return false;
227 }
228 }
229 return true;
230 }
231
232 @Override
233 public boolean isTerminated() {
234 for (EventExecutor l: children) {
235 if (!l.isTerminated()) {
236 return false;
237 }
238 }
239 return true;
240 }
241
242 @Override
243 public boolean awaitTermination(long timeout, TimeUnit unit)
244 throws InterruptedException {
245 long deadline = System.nanoTime() + unit.toNanos(timeout);
246 loop: for (EventExecutor l: children) {
247 for (;;) {
248 long timeLeft = deadline - System.nanoTime();
249 if (timeLeft <= 0) {
250 break loop;
251 }
252 if (l.awaitTermination(timeLeft, TimeUnit.NANOSECONDS)) {
253 break;
254 }
255 }
256 }
257 return isTerminated();
258 }
259 }