1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel;
17
18
19 import io.netty.util.concurrent.AbstractEventExecutorGroup;
20 import io.netty.util.concurrent.DefaultPromise;
21 import io.netty.util.concurrent.DefaultThreadFactory;
22 import io.netty.util.concurrent.EventExecutor;
23 import io.netty.util.concurrent.Future;
24 import io.netty.util.concurrent.FutureListener;
25 import io.netty.util.concurrent.GlobalEventExecutor;
26 import io.netty.util.concurrent.Promise;
27 import io.netty.util.concurrent.ThreadPerTaskExecutor;
28 import io.netty.util.internal.EmptyArrays;
29 import io.netty.util.internal.ObjectUtil;
30 import io.netty.util.internal.ReadOnlyIterator;
31
32 import java.util.Collections;
33 import java.util.Iterator;
34 import java.util.Queue;
35 import java.util.Set;
36 import java.util.concurrent.ConcurrentHashMap;
37 import java.util.concurrent.ConcurrentLinkedQueue;
38 import java.util.concurrent.Executor;
39 import java.util.concurrent.RejectedExecutionException;
40 import java.util.concurrent.ThreadFactory;
41 import java.util.concurrent.TimeUnit;
42
43
44
45
46
47
48 @Deprecated
49 public class ThreadPerChannelEventLoopGroup extends AbstractEventExecutorGroup implements EventLoopGroup {
50
51 private final Object[] childArgs;
52 private final int maxChannels;
53 final Executor executor;
54 final Set<EventLoop> activeChildren = ConcurrentHashMap.newKeySet();
55 final Queue<EventLoop> idleChildren = new ConcurrentLinkedQueue<>();
56 private final ChannelException tooManyChannels;
57
58 private volatile boolean shuttingDown;
59 private final Promise<?> terminationFuture = new DefaultPromise<Void>(GlobalEventExecutor.INSTANCE);
60 private final FutureListener<Object> childTerminationListener = new FutureListener<Object>() {
61 @Override
62 public void operationComplete(Future<Object> future) throws Exception {
63
64 if (isTerminated()) {
65 terminationFuture.trySuccess(null);
66 }
67 }
68 };
69
70
71
72
73 protected ThreadPerChannelEventLoopGroup() {
74 this(0);
75 }
76
77
78
79
80
81
82
83
84
85
86 protected ThreadPerChannelEventLoopGroup(int maxChannels) {
87 this(maxChannels, (ThreadFactory) null);
88 }
89
90
91
92
93
94
95
96
97
98
99
100
101
102 protected ThreadPerChannelEventLoopGroup(int maxChannels, ThreadFactory threadFactory, Object... args) {
103 this(maxChannels, threadFactory == null ? null : new ThreadPerTaskExecutor(threadFactory), args);
104 }
105
106
107
108
109
110
111
112
113
114
115
116
117
118 protected ThreadPerChannelEventLoopGroup(int maxChannels, Executor executor, Object... args) {
119 ObjectUtil.checkPositiveOrZero(maxChannels, "maxChannels");
120 if (executor == null) {
121 executor = new ThreadPerTaskExecutor(new DefaultThreadFactory(getClass()));
122 }
123
124 if (args == null) {
125 childArgs = EmptyArrays.EMPTY_OBJECTS;
126 } else {
127 childArgs = args.clone();
128 }
129
130 this.maxChannels = maxChannels;
131 this.executor = executor;
132
133 tooManyChannels =
134 ChannelException.newStatic("too many channels (max: " + maxChannels + ')',
135 ThreadPerChannelEventLoopGroup.class, "nextChild()");
136 }
137
138
139
140
141 protected EventLoop newChild(@SuppressWarnings("UnusedParameters") Object... args) throws Exception {
142 return new ThreadPerChannelEventLoop(this);
143 }
144
145 @Override
146 public Iterator<EventExecutor> iterator() {
147 return new ReadOnlyIterator<EventExecutor>(activeChildren.iterator());
148 }
149
150 @Override
151 public EventLoop next() {
152 throw new UnsupportedOperationException();
153 }
154
155 @Override
156 public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
157 shuttingDown = true;
158
159 for (EventLoop l: activeChildren) {
160 l.shutdownGracefully(quietPeriod, timeout, unit);
161 }
162 for (EventLoop l: idleChildren) {
163 l.shutdownGracefully(quietPeriod, timeout, unit);
164 }
165
166
167 if (isTerminated()) {
168 terminationFuture.trySuccess(null);
169 }
170
171 return terminationFuture();
172 }
173
174 @Override
175 public Future<?> terminationFuture() {
176 return terminationFuture;
177 }
178
179 @Override
180 @Deprecated
181 public void shutdown() {
182 shuttingDown = true;
183
184 for (EventLoop l: activeChildren) {
185 l.shutdown();
186 }
187 for (EventLoop l: idleChildren) {
188 l.shutdown();
189 }
190
191
192 if (isTerminated()) {
193 terminationFuture.trySuccess(null);
194 }
195 }
196
197 @Override
198 public boolean isShuttingDown() {
199 for (EventLoop l: activeChildren) {
200 if (!l.isShuttingDown()) {
201 return false;
202 }
203 }
204 for (EventLoop l: idleChildren) {
205 if (!l.isShuttingDown()) {
206 return false;
207 }
208 }
209 return true;
210 }
211
212 @Override
213 public boolean isShutdown() {
214 for (EventLoop l: activeChildren) {
215 if (!l.isShutdown()) {
216 return false;
217 }
218 }
219 for (EventLoop l: idleChildren) {
220 if (!l.isShutdown()) {
221 return false;
222 }
223 }
224 return true;
225 }
226
227 @Override
228 public boolean isTerminated() {
229 for (EventLoop l: activeChildren) {
230 if (!l.isTerminated()) {
231 return false;
232 }
233 }
234 for (EventLoop l: idleChildren) {
235 if (!l.isTerminated()) {
236 return false;
237 }
238 }
239 return true;
240 }
241
242 @Override
243 public boolean awaitTermination(long timeout, TimeUnit unit)
244 throws InterruptedException {
245 long deadline = System.nanoTime() + unit.toNanos(timeout);
246 for (EventLoop l: activeChildren) {
247 for (;;) {
248 long timeLeft = deadline - System.nanoTime();
249 if (timeLeft <= 0) {
250 return isTerminated();
251 }
252 if (l.awaitTermination(timeLeft, TimeUnit.NANOSECONDS)) {
253 break;
254 }
255 }
256 }
257 for (EventLoop l: idleChildren) {
258 for (;;) {
259 long timeLeft = deadline - System.nanoTime();
260 if (timeLeft <= 0) {
261 return isTerminated();
262 }
263 if (l.awaitTermination(timeLeft, TimeUnit.NANOSECONDS)) {
264 break;
265 }
266 }
267 }
268 return isTerminated();
269 }
270
271 @Override
272 public ChannelFuture register(Channel channel) {
273 ObjectUtil.checkNotNull(channel, "channel");
274 try {
275 EventLoop l = nextChild();
276 return l.register(new DefaultChannelPromise(channel, l));
277 } catch (Throwable t) {
278 return new FailedChannelFuture(channel, GlobalEventExecutor.INSTANCE, t);
279 }
280 }
281
282 @Override
283 public ChannelFuture register(ChannelPromise promise) {
284 try {
285 return nextChild().register(promise);
286 } catch (Throwable t) {
287 promise.setFailure(t);
288 return promise;
289 }
290 }
291
292 @Deprecated
293 @Override
294 public ChannelFuture register(Channel channel, ChannelPromise promise) {
295 ObjectUtil.checkNotNull(channel, "channel");
296 try {
297 return nextChild().register(channel, promise);
298 } catch (Throwable t) {
299 promise.setFailure(t);
300 return promise;
301 }
302 }
303
304 private EventLoop nextChild() throws Exception {
305 if (shuttingDown) {
306 throw new RejectedExecutionException("shutting down");
307 }
308
309 EventLoop loop = idleChildren.poll();
310 if (loop == null) {
311 if (maxChannels > 0 && activeChildren.size() >= maxChannels) {
312 throw tooManyChannels;
313 }
314 loop = newChild(childArgs);
315 loop.terminationFuture().addListener(childTerminationListener);
316 }
317 activeChildren.add(loop);
318 return loop;
319 }
320 }