1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.util.concurrent;
17
18 import java.util.Collections;
19 import java.util.Iterator;
20 import java.util.LinkedHashMap;
21 import java.util.Set;
22 import java.util.concurrent.ThreadFactory;
23 import java.util.concurrent.TimeUnit;
24 import java.util.concurrent.atomic.AtomicInteger;
25
26
27
28
29
30 public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
31
32 private final EventExecutor[] children;
33 private final AtomicInteger childIndex = new AtomicInteger();
34 private final AtomicInteger terminatedChildren = new AtomicInteger();
35 private final Promise<?> terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);
36 private final EventExecutorChooser chooser;
37
38
39
40
41
42
43
44
45 protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
46 if (nThreads <= 0) {
47 throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
48 }
49
50 if (threadFactory == null) {
51 threadFactory = newDefaultThreadFactory();
52 }
53
54 children = new SingleThreadEventExecutor[nThreads];
55 if (isPowerOfTwo(children.length)) {
56 chooser = new PowerOfTwoEventExecutorChooser();
57 } else {
58 chooser = new GenericEventExecutorChooser();
59 }
60
61 for (int i = 0; i < nThreads; i ++) {
62 boolean success = false;
63 try {
64 children[i] = newChild(threadFactory, args);
65 success = true;
66 } catch (Exception e) {
67
68 throw new IllegalStateException("failed to create a child event loop", e);
69 } finally {
70 if (!success) {
71 for (int j = 0; j < i; j ++) {
72 children[j].shutdownGracefully();
73 }
74
75 for (int j = 0; j < i; j ++) {
76 EventExecutor e = children[j];
77 try {
78 while (!e.isTerminated()) {
79 e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
80 }
81 } catch (InterruptedException interrupted) {
82 Thread.currentThread().interrupt();
83 break;
84 }
85 }
86 }
87 }
88 }
89
90 final FutureListener<Object> terminationListener = new FutureListener<Object>() {
91 @Override
92 public void operationComplete(Future<Object> future) throws Exception {
93 if (terminatedChildren.incrementAndGet() == children.length) {
94 terminationFuture.setSuccess(null);
95 }
96 }
97 };
98
99 for (EventExecutor e: children) {
100 e.terminationFuture().addListener(terminationListener);
101 }
102 }
103
104 protected ThreadFactory newDefaultThreadFactory() {
105 return new DefaultThreadFactory(getClass());
106 }
107
108 @Override
109 public EventExecutor next() {
110 return chooser.next();
111 }
112
113 @Override
114 public Iterator<EventExecutor> iterator() {
115 return children().iterator();
116 }
117
118
119
120
121
122 public final int executorCount() {
123 return children.length;
124 }
125
126
127
128
129 protected Set<EventExecutor> children() {
130 Set<EventExecutor> children = Collections.newSetFromMap(new LinkedHashMap<EventExecutor, Boolean>());
131 Collections.addAll(children, this.children);
132 return children;
133 }
134
135
136
137
138
139
140 protected abstract EventExecutor newChild(
141 ThreadFactory threadFactory, Object... args) throws Exception;
142
143 @Override
144 public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
145 for (EventExecutor l: children) {
146 l.shutdownGracefully(quietPeriod, timeout, unit);
147 }
148 return terminationFuture();
149 }
150
151 @Override
152 public Future<?> terminationFuture() {
153 return terminationFuture;
154 }
155
156 @Override
157 @Deprecated
158 public void shutdown() {
159 for (EventExecutor l: children) {
160 l.shutdown();
161 }
162 }
163
164 @Override
165 public boolean isShuttingDown() {
166 for (EventExecutor l: children) {
167 if (!l.isShuttingDown()) {
168 return false;
169 }
170 }
171 return true;
172 }
173
174 @Override
175 public boolean isShutdown() {
176 for (EventExecutor l: children) {
177 if (!l.isShutdown()) {
178 return false;
179 }
180 }
181 return true;
182 }
183
184 @Override
185 public boolean isTerminated() {
186 for (EventExecutor l: children) {
187 if (!l.isTerminated()) {
188 return false;
189 }
190 }
191 return true;
192 }
193
194 @Override
195 public boolean awaitTermination(long timeout, TimeUnit unit)
196 throws InterruptedException {
197 long deadline = System.nanoTime() + unit.toNanos(timeout);
198 loop: for (EventExecutor l: children) {
199 for (;;) {
200 long timeLeft = deadline - System.nanoTime();
201 if (timeLeft <= 0) {
202 break loop;
203 }
204 if (l.awaitTermination(timeLeft, TimeUnit.NANOSECONDS)) {
205 break;
206 }
207 }
208 }
209 return isTerminated();
210 }
211
212 private static boolean isPowerOfTwo(int val) {
213 return (val & -val) == val;
214 }
215
216 private interface EventExecutorChooser {
217 EventExecutor next();
218 }
219
220 private final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
221 @Override
222 public EventExecutor next() {
223 return children[childIndex.getAndIncrement() & children.length - 1];
224 }
225 }
226
227 private final class GenericEventExecutorChooser implements EventExecutorChooser {
228 @Override
229 public EventExecutor next() {
230 return children[Math.abs(childIndex.getAndIncrement() % children.length)];
231 }
232 }
233 }