1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.util.concurrent;
17
18 import static io.netty.util.internal.ObjectUtil.checkPositive;
19
20 import java.util.Collections;
21 import java.util.Iterator;
22 import java.util.LinkedHashSet;
23 import java.util.Set;
24 import java.util.concurrent.Executor;
25 import java.util.concurrent.ThreadFactory;
26 import java.util.concurrent.TimeUnit;
27 import java.util.concurrent.atomic.AtomicInteger;
28
29
30
31
32
33 public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
34
35 private final EventExecutor[] children;
36 private final Set<EventExecutor> readonlyChildren;
37 private final AtomicInteger terminatedChildren = new AtomicInteger();
38 private final Promise<?> terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);
39 private final EventExecutorChooserFactory.EventExecutorChooser chooser;
40
41
42
43
44
45
46
47
48 protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
49 this(nThreads, threadFactory == null ? null : new ThreadPerTaskExecutor(threadFactory), args);
50 }
51
52
53
54
55
56
57
58
59 protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
60 this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
61 }
62
63
64
65
66
67
68
69
70
71 protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
72 EventExecutorChooserFactory chooserFactory, Object... args) {
73 checkPositive(nThreads, "nThreads");
74
75 if (executor == null) {
76 executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
77 }
78
79 children = new EventExecutor[nThreads];
80
81 for (int i = 0; i < nThreads; i ++) {
82 boolean success = false;
83 try {
84 children[i] = newChild(executor, args);
85 success = true;
86 } catch (Exception e) {
87
88 throw new IllegalStateException("failed to create a child event loop", e);
89 } finally {
90 if (!success) {
91 for (int j = 0; j < i; j ++) {
92 children[j].shutdownGracefully();
93 }
94
95 for (int j = 0; j < i; j ++) {
96 EventExecutor e = children[j];
97 try {
98 while (!e.isTerminated()) {
99 e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
100 }
101 } catch (InterruptedException interrupted) {
102
103 Thread.currentThread().interrupt();
104 break;
105 }
106 }
107 }
108 }
109 }
110
111 chooser = chooserFactory.newChooser(children);
112
113 final FutureListener<Object> terminationListener = new FutureListener<Object>() {
114 @Override
115 public void operationComplete(Future<Object> future) throws Exception {
116 if (terminatedChildren.incrementAndGet() == children.length) {
117 terminationFuture.setSuccess(null);
118 }
119 }
120 };
121
122 for (EventExecutor e: children) {
123 e.terminationFuture().addListener(terminationListener);
124 }
125
126 Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
127 Collections.addAll(childrenSet, children);
128 readonlyChildren = Collections.unmodifiableSet(childrenSet);
129 }
130
131 protected ThreadFactory newDefaultThreadFactory() {
132 return new DefaultThreadFactory(getClass());
133 }
134
135 @Override
136 public EventExecutor next() {
137 return chooser.next();
138 }
139
140 @Override
141 public Iterator<EventExecutor> iterator() {
142 return readonlyChildren.iterator();
143 }
144
145
146
147
148
149 public final int executorCount() {
150 return children.length;
151 }
152
153
154
155
156
157
158 protected abstract EventExecutor newChild(Executor executor, Object... args) throws Exception;
159
160 @Override
161 public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
162 for (EventExecutor l: children) {
163 l.shutdownGracefully(quietPeriod, timeout, unit);
164 }
165 return terminationFuture();
166 }
167
168 @Override
169 public Future<?> terminationFuture() {
170 return terminationFuture;
171 }
172
173 @Override
174 @Deprecated
175 public void shutdown() {
176 for (EventExecutor l: children) {
177 l.shutdown();
178 }
179 }
180
181 @Override
182 public boolean isShuttingDown() {
183 for (EventExecutor l: children) {
184 if (!l.isShuttingDown()) {
185 return false;
186 }
187 }
188 return true;
189 }
190
191 @Override
192 public boolean isShutdown() {
193 for (EventExecutor l: children) {
194 if (!l.isShutdown()) {
195 return false;
196 }
197 }
198 return true;
199 }
200
201 @Override
202 public boolean isTerminated() {
203 for (EventExecutor l: children) {
204 if (!l.isTerminated()) {
205 return false;
206 }
207 }
208 return true;
209 }
210
211 @Override
212 public boolean awaitTermination(long timeout, TimeUnit unit)
213 throws InterruptedException {
214 long deadline = System.nanoTime() + unit.toNanos(timeout);
215 loop: for (EventExecutor l: children) {
216 for (;;) {
217 long timeLeft = deadline - System.nanoTime();
218 if (timeLeft <= 0) {
219 break loop;
220 }
221 if (l.awaitTermination(timeLeft, TimeUnit.NANOSECONDS)) {
222 break;
223 }
224 }
225 }
226 return isTerminated();
227 }
228 }