1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.util.concurrent;
17
18 import io.netty.util.internal.ObjectUtil;
19 import io.netty.util.internal.PlatformDependent;
20 import io.netty.util.internal.UnstableApi;
21
22 import java.util.Collection;
23 import java.util.Iterator;
24 import java.util.List;
25 import java.util.Queue;
26 import java.util.concurrent.Callable;
27 import java.util.concurrent.ExecutionException;
28 import java.util.concurrent.RejectedExecutionException;
29 import java.util.concurrent.TimeUnit;
30 import java.util.concurrent.TimeoutException;
31 import java.util.concurrent.atomic.AtomicInteger;
32
33
34
35
36
37
38
39
40 @UnstableApi
41 public final class NonStickyEventExecutorGroup implements EventExecutorGroup {
42 private final EventExecutorGroup group;
43 private final int maxTaskExecutePerRun;
44
45
46
47
48
49 public NonStickyEventExecutorGroup(EventExecutorGroup group) {
50 this(group, 1024);
51 }
52
53
54
55
56
57 public NonStickyEventExecutorGroup(EventExecutorGroup group, int maxTaskExecutePerRun) {
58 this.group = verify(group);
59 this.maxTaskExecutePerRun = ObjectUtil.checkPositive(maxTaskExecutePerRun, "maxTaskExecutePerRun");
60 }
61
62 private static EventExecutorGroup verify(EventExecutorGroup group) {
63 Iterator<EventExecutor> executors = ObjectUtil.checkNotNull(group, "group").iterator();
64 while (executors.hasNext()) {
65 EventExecutor executor = executors.next();
66 if (executor instanceof OrderedEventExecutor) {
67 throw new IllegalArgumentException("EventExecutorGroup " + group
68 + " contains OrderedEventExecutors: " + executor);
69 }
70 }
71 return group;
72 }
73
74 private NonStickyOrderedEventExecutor newExecutor(EventExecutor executor) {
75 return new NonStickyOrderedEventExecutor(executor, maxTaskExecutePerRun);
76 }
77
78 @Override
79 public boolean isShuttingDown() {
80 return group.isShuttingDown();
81 }
82
83 @Override
84 public Future<?> shutdownGracefully() {
85 return group.shutdownGracefully();
86 }
87
88 @Override
89 public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
90 return group.shutdownGracefully(quietPeriod, timeout, unit);
91 }
92
93 @Override
94 public Future<?> terminationFuture() {
95 return group.terminationFuture();
96 }
97
98 @SuppressWarnings("deprecation")
99 @Override
100 public void shutdown() {
101 group.shutdown();
102 }
103
104 @SuppressWarnings("deprecation")
105 @Override
106 public List<Runnable> shutdownNow() {
107 return group.shutdownNow();
108 }
109
110 @Override
111 public EventExecutor next() {
112 return newExecutor(group.next());
113 }
114
115 @Override
116 public Iterator<EventExecutor> iterator() {
117 final Iterator<EventExecutor> itr = group.iterator();
118 return new Iterator<EventExecutor>() {
119 @Override
120 public boolean hasNext() {
121 return itr.hasNext();
122 }
123
124 @Override
125 public EventExecutor next() {
126 return newExecutor(itr.next());
127 }
128
129 @Override
130 public void remove() {
131 itr.remove();
132 }
133 };
134 }
135
136 @Override
137 public Future<?> submit(Runnable task) {
138 return group.submit(task);
139 }
140
141 @Override
142 public <T> Future<T> submit(Runnable task, T result) {
143 return group.submit(task, result);
144 }
145
146 @Override
147 public <T> Future<T> submit(Callable<T> task) {
148 return group.submit(task);
149 }
150
151 @Override
152 public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
153 return group.schedule(command, delay, unit);
154 }
155
156 @Override
157 public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
158 return group.schedule(callable, delay, unit);
159 }
160
161 @Override
162 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
163 return group.scheduleAtFixedRate(command, initialDelay, period, unit);
164 }
165
166 @Override
167 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
168 return group.scheduleWithFixedDelay(command, initialDelay, delay, unit);
169 }
170
171 @Override
172 public boolean isShutdown() {
173 return group.isShutdown();
174 }
175
176 @Override
177 public boolean isTerminated() {
178 return group.isTerminated();
179 }
180
181 @Override
182 public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
183 return group.awaitTermination(timeout, unit);
184 }
185
186 @Override
187 public <T> List<java.util.concurrent.Future<T>> invokeAll(
188 Collection<? extends Callable<T>> tasks) throws InterruptedException {
189 return group.invokeAll(tasks);
190 }
191
192 @Override
193 public <T> List<java.util.concurrent.Future<T>> invokeAll(
194 Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
195 return group.invokeAll(tasks, timeout, unit);
196 }
197
198 @Override
199 public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
200 return group.invokeAny(tasks);
201 }
202
203 @Override
204 public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
205 throws InterruptedException, ExecutionException, TimeoutException {
206 return group.invokeAny(tasks, timeout, unit);
207 }
208
209 @Override
210 public void execute(Runnable command) {
211 group.execute(command);
212 }
213
214 private static final class NonStickyOrderedEventExecutor extends AbstractEventExecutor
215 implements Runnable, OrderedEventExecutor {
216 private final EventExecutor executor;
217 private final Queue<Runnable> tasks = PlatformDependent.newMpscQueue();
218
219 private static final int NONE = 0;
220 private static final int SUBMITTED = 1;
221 private static final int RUNNING = 2;
222
223 private final AtomicInteger state = new AtomicInteger();
224 private final int maxTaskExecutePerRun;
225
226 NonStickyOrderedEventExecutor(EventExecutor executor, int maxTaskExecutePerRun) {
227 super(executor);
228 this.executor = executor;
229 this.maxTaskExecutePerRun = maxTaskExecutePerRun;
230 }
231
232 @Override
233 public void run() {
234 if (!state.compareAndSet(SUBMITTED, RUNNING)) {
235 return;
236 }
237 for (;;) {
238 int i = 0;
239 try {
240 for (; i < maxTaskExecutePerRun; i++) {
241 Runnable task = tasks.poll();
242 if (task == null) {
243 break;
244 }
245 safeExecute(task);
246 }
247 } finally {
248 if (i == maxTaskExecutePerRun) {
249 try {
250 state.set(SUBMITTED);
251 executor.execute(this);
252 return;
253 } catch (Throwable ignore) {
254
255 state.set(RUNNING);
256
257
258
259 }
260 } else {
261 state.set(NONE);
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277 if (tasks.isEmpty() || !state.compareAndSet(NONE, RUNNING)) {
278 return;
279 }
280 }
281 }
282 }
283 }
284
285 @Override
286 public boolean inEventLoop(Thread thread) {
287 return false;
288 }
289
290 @Override
291 public boolean inEventLoop() {
292 return false;
293 }
294
295 @Override
296 public boolean isShuttingDown() {
297 return executor.isShutdown();
298 }
299
300 @Override
301 public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
302 return executor.shutdownGracefully(quietPeriod, timeout, unit);
303 }
304
305 @Override
306 public Future<?> terminationFuture() {
307 return executor.terminationFuture();
308 }
309
310 @Override
311 public void shutdown() {
312 executor.shutdown();
313 }
314
315 @Override
316 public boolean isShutdown() {
317 return executor.isShutdown();
318 }
319
320 @Override
321 public boolean isTerminated() {
322 return executor.isTerminated();
323 }
324
325 @Override
326 public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
327 return executor.awaitTermination(timeout, unit);
328 }
329
330 @Override
331 public void execute(Runnable command) {
332 if (!tasks.offer(command)) {
333 throw new RejectedExecutionException();
334 }
335 if (state.compareAndSet(NONE, SUBMITTED)) {
336
337
338 executor.execute(this);
339 }
340 }
341 }
342 }