1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel;
17
18
19 import io.netty.util.concurrent.AbstractEventExecutorGroup;
20 import io.netty.util.concurrent.DefaultPromise;
21 import io.netty.util.concurrent.EventExecutor;
22 import io.netty.util.concurrent.Future;
23 import io.netty.util.concurrent.FutureListener;
24 import io.netty.util.concurrent.GlobalEventExecutor;
25 import io.netty.util.concurrent.Promise;
26 import io.netty.util.internal.EmptyArrays;
27 import io.netty.util.internal.PlatformDependent;
28 import io.netty.util.internal.ReadOnlyIterator;
29 import io.netty.util.internal.ThrowableUtil;
30
31 import java.util.Collections;
32 import java.util.Iterator;
33 import java.util.Queue;
34 import java.util.Set;
35 import java.util.concurrent.ConcurrentLinkedQueue;
36 import java.util.concurrent.Executors;
37 import java.util.concurrent.RejectedExecutionException;
38 import java.util.concurrent.ThreadFactory;
39 import java.util.concurrent.TimeUnit;
40
41
42
43
44 public class ThreadPerChannelEventLoopGroup extends AbstractEventExecutorGroup implements EventLoopGroup {
45
46 private final Object[] childArgs;
47 private final int maxChannels;
48 final ThreadFactory threadFactory;
49 final Set<ThreadPerChannelEventLoop> activeChildren =
50 Collections.newSetFromMap(PlatformDependent.<ThreadPerChannelEventLoop, Boolean>newConcurrentHashMap());
51 final Queue<ThreadPerChannelEventLoop> idleChildren = new ConcurrentLinkedQueue<ThreadPerChannelEventLoop>();
52 private final ChannelException tooManyChannels;
53
54 private volatile boolean shuttingDown;
55 private final Promise<?> terminationFuture = new DefaultPromise<Void>(GlobalEventExecutor.INSTANCE);
56 private final FutureListener<Object> childTerminationListener = new FutureListener<Object>() {
57 @Override
58 public void operationComplete(Future<Object> future) throws Exception {
59
60 if (isTerminated()) {
61 terminationFuture.trySuccess(null);
62 }
63 }
64 };
65
66
67
68
69 protected ThreadPerChannelEventLoopGroup() {
70 this(0);
71 }
72
73
74
75
76
77
78
79
80
81
82 protected ThreadPerChannelEventLoopGroup(int maxChannels) {
83 this(maxChannels, Executors.defaultThreadFactory());
84 }
85
86
87
88
89
90
91
92
93
94
95
96
97
98 protected ThreadPerChannelEventLoopGroup(int maxChannels, ThreadFactory threadFactory, Object... args) {
99 if (maxChannels < 0) {
100 throw new IllegalArgumentException(String.format(
101 "maxChannels: %d (expected: >= 0)", maxChannels));
102 }
103 if (threadFactory == null) {
104 throw new NullPointerException("threadFactory");
105 }
106
107 if (args == null) {
108 childArgs = EmptyArrays.EMPTY_OBJECTS;
109 } else {
110 childArgs = args.clone();
111 }
112
113 this.maxChannels = maxChannels;
114 this.threadFactory = threadFactory;
115
116 tooManyChannels = ThrowableUtil.unknownStackTrace(
117 new ChannelException("too many channels (max: " + maxChannels + ')'),
118 ThreadPerChannelEventLoopGroup.class, "nextChild()");
119 }
120
121
122
123
124 protected ThreadPerChannelEventLoop newChild(
125 @SuppressWarnings("UnusedParameters") Object... args) throws Exception {
126 return new ThreadPerChannelEventLoop(this);
127 }
128
129 @Override
130 public Iterator<EventExecutor> iterator() {
131 return new ReadOnlyIterator<EventExecutor>(activeChildren.iterator());
132 }
133
134 @Override
135 public EventLoop next() {
136 throw new UnsupportedOperationException();
137 }
138
139 @Override
140 public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
141 shuttingDown = true;
142
143 for (EventLoop l: activeChildren) {
144 l.shutdownGracefully(quietPeriod, timeout, unit);
145 }
146 for (EventLoop l: idleChildren) {
147 l.shutdownGracefully(quietPeriod, timeout, unit);
148 }
149
150
151 if (isTerminated()) {
152 terminationFuture.trySuccess(null);
153 }
154
155 return terminationFuture();
156 }
157
158 @Override
159 public Future<?> terminationFuture() {
160 return terminationFuture;
161 }
162
163 @Override
164 @Deprecated
165 public void shutdown() {
166 shuttingDown = true;
167
168 for (EventLoop l: activeChildren) {
169 l.shutdown();
170 }
171 for (EventLoop l: idleChildren) {
172 l.shutdown();
173 }
174
175
176 if (isTerminated()) {
177 terminationFuture.trySuccess(null);
178 }
179 }
180
181 @Override
182 public boolean isShuttingDown() {
183 for (EventLoop l: activeChildren) {
184 if (!l.isShuttingDown()) {
185 return false;
186 }
187 }
188 for (EventLoop l: idleChildren) {
189 if (!l.isShuttingDown()) {
190 return false;
191 }
192 }
193 return true;
194 }
195
196 @Override
197 public boolean isShutdown() {
198 for (EventLoop l: activeChildren) {
199 if (!l.isShutdown()) {
200 return false;
201 }
202 }
203 for (EventLoop l: idleChildren) {
204 if (!l.isShutdown()) {
205 return false;
206 }
207 }
208 return true;
209 }
210
211 @Override
212 public boolean isTerminated() {
213 for (EventLoop l: activeChildren) {
214 if (!l.isTerminated()) {
215 return false;
216 }
217 }
218 for (EventLoop l: idleChildren) {
219 if (!l.isTerminated()) {
220 return false;
221 }
222 }
223 return true;
224 }
225
226 @Override
227 public boolean awaitTermination(long timeout, TimeUnit unit)
228 throws InterruptedException {
229 long deadline = System.nanoTime() + unit.toNanos(timeout);
230 for (EventLoop l: activeChildren) {
231 for (;;) {
232 long timeLeft = deadline - System.nanoTime();
233 if (timeLeft <= 0) {
234 return isTerminated();
235 }
236 if (l.awaitTermination(timeLeft, TimeUnit.NANOSECONDS)) {
237 break;
238 }
239 }
240 }
241 for (EventLoop l: idleChildren) {
242 for (;;) {
243 long timeLeft = deadline - System.nanoTime();
244 if (timeLeft <= 0) {
245 return isTerminated();
246 }
247 if (l.awaitTermination(timeLeft, TimeUnit.NANOSECONDS)) {
248 break;
249 }
250 }
251 }
252 return isTerminated();
253 }
254
255 @Override
256 public ChannelFuture register(Channel channel) {
257 if (channel == null) {
258 throw new NullPointerException("channel");
259 }
260 try {
261 EventLoop l = nextChild();
262 return l.register(channel, new DefaultChannelPromise(channel, l));
263 } catch (Throwable t) {
264 return new FailedChannelFuture(channel, GlobalEventExecutor.INSTANCE, t);
265 }
266 }
267
268 @Override
269 public ChannelFuture register(Channel channel, ChannelPromise promise) {
270 if (channel == null) {
271 throw new NullPointerException("channel");
272 }
273 try {
274 return nextChild().register(channel, promise);
275 } catch (Throwable t) {
276 promise.setFailure(t);
277 return promise;
278 }
279 }
280
281 private EventLoop nextChild() throws Exception {
282 if (shuttingDown) {
283 throw new RejectedExecutionException("shutting down");
284 }
285
286 ThreadPerChannelEventLoop loop = idleChildren.poll();
287 if (loop == null) {
288 if (maxChannels > 0 && activeChildren.size() >= maxChannels) {
289 throw tooManyChannels;
290 }
291 loop = newChild(childArgs);
292 loop.terminationFuture().addListener(childTerminationListener);
293 }
294 activeChildren.add(loop);
295 return loop;
296 }
297 }