1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty5.util.concurrent;
17
18 import io.netty5.util.internal.PlatformDependent;
19 import io.netty5.util.internal.UnstableApi;
20
21 import java.util.Iterator;
22 import java.util.Queue;
23 import java.util.concurrent.Callable;
24 import java.util.concurrent.RejectedExecutionException;
25 import java.util.concurrent.TimeUnit;
26 import java.util.concurrent.atomic.AtomicInteger;
27
28 import static io.netty5.util.internal.ObjectUtil.checkPositive;
29 import static java.util.Objects.requireNonNull;
30
31
32
33
34
35
36
37
38 @UnstableApi
39 public final class NonStickyEventExecutorGroup implements EventExecutorGroup {
40 private final EventExecutorGroup group;
41 private final int maxTaskExecutePerRun;
42
43
44
45
46
47 public NonStickyEventExecutorGroup(EventExecutorGroup group) {
48 this(group, 1024);
49 }
50
51
52
53
54
55 public NonStickyEventExecutorGroup(EventExecutorGroup group, int maxTaskExecutePerRun) {
56 this.group = verify(group);
57 this.maxTaskExecutePerRun = checkPositive(maxTaskExecutePerRun, "maxTaskExecutePerRun");
58 }
59
60 private static EventExecutorGroup verify(EventExecutorGroup group) {
61 Iterator<EventExecutor> executors = requireNonNull(group, "group").iterator();
62 while (executors.hasNext()) {
63 EventExecutor executor = executors.next();
64 if (executor instanceof OrderedEventExecutor) {
65 throw new IllegalArgumentException("EventExecutorGroup " + group
66 + " contains OrderedEventExecutors: " + executor);
67 }
68 }
69 return group;
70 }
71
72 private NonStickyOrderedEventExecutor newExecutor(EventExecutor executor) {
73 return new NonStickyOrderedEventExecutor(executor, maxTaskExecutePerRun);
74 }
75
76 @Override
77 public boolean isShuttingDown() {
78 return group.isShuttingDown();
79 }
80
81 @Override
82 public Future<Void> shutdownGracefully() {
83 return group.shutdownGracefully();
84 }
85
86 @Override
87 public Future<Void> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
88 return group.shutdownGracefully(quietPeriod, timeout, unit);
89 }
90
91 @Override
92 public Future<Void> terminationFuture() {
93 return group.terminationFuture();
94 }
95
96 @Override
97 public EventExecutor next() {
98 return newExecutor(group.next());
99 }
100
101 @Override
102 public Iterator<EventExecutor> iterator() {
103 final Iterator<EventExecutor> itr = group.iterator();
104 return new Iterator<>() {
105 @Override
106 public boolean hasNext() {
107 return itr.hasNext();
108 }
109
110 @Override
111 public EventExecutor next() {
112 return newExecutor(itr.next());
113 }
114
115 @Override
116 public void remove() {
117 itr.remove();
118 }
119 };
120 }
121
122 @Override
123 public Future<Void> submit(Runnable task) {
124 return group.submit(task);
125 }
126
127 @Override
128 public <T> Future<T> submit(Runnable task, T result) {
129 return group.submit(task, result);
130 }
131
132 @Override
133 public <T> Future<T> submit(Callable<T> task) {
134 return group.submit(task);
135 }
136
137 @Override
138 public Future<Void> schedule(Runnable task, long delay, TimeUnit unit) {
139 return group.schedule(task, delay, unit);
140 }
141
142 @Override
143 public <V> Future<V> schedule(Callable<V> task, long delay, TimeUnit unit) {
144 return group.schedule(task, delay, unit);
145 }
146
147 @Override
148 public Future<Void> scheduleAtFixedRate(Runnable task, long initialDelay, long period, TimeUnit unit) {
149 return group.scheduleAtFixedRate(task, initialDelay, period, unit);
150 }
151
152 @Override
153 public Future<Void> scheduleWithFixedDelay(Runnable task, long initialDelay, long delay, TimeUnit unit) {
154 return group.scheduleWithFixedDelay(task, initialDelay, delay, unit);
155 }
156
157 @Override
158 public boolean isShutdown() {
159 return group.isShutdown();
160 }
161
162 @Override
163 public boolean isTerminated() {
164 return group.isTerminated();
165 }
166
167 @Override
168 public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
169 return group.awaitTermination(timeout, unit);
170 }
171
172 @Override
173 public void execute(Runnable task) {
174 group.execute(task);
175 }
176
177 private static final class NonStickyOrderedEventExecutor extends AbstractEventExecutor
178 implements Runnable, OrderedEventExecutor {
179 private final EventExecutor executor;
180 private final Queue<Runnable> tasks = PlatformDependent.newMpscQueue();
181
182 private static final int NONE = 0;
183 private static final int SUBMITTED = 1;
184 private static final int RUNNING = 2;
185
186 private final AtomicInteger state = new AtomicInteger();
187 private final int maxTaskExecutePerRun;
188
189 NonStickyOrderedEventExecutor(EventExecutor executor, int maxTaskExecutePerRun) {
190 this.executor = executor;
191 this.maxTaskExecutePerRun = maxTaskExecutePerRun;
192 }
193
194 @Override
195 public void run() {
196 if (!state.compareAndSet(SUBMITTED, RUNNING)) {
197 return;
198 }
199 for (;;) {
200 int i = 0;
201 try {
202 for (; i < maxTaskExecutePerRun; i++) {
203 Runnable task = tasks.poll();
204 if (task == null) {
205 break;
206 }
207 safeExecute(task);
208 }
209 } finally {
210 if (i == maxTaskExecutePerRun) {
211 try {
212 state.set(SUBMITTED);
213 executor.execute(this);
214 return;
215 } catch (Throwable ignore) {
216
217 state.set(RUNNING);
218
219
220
221 }
222 } else {
223 state.set(NONE);
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239 if (tasks.isEmpty() || !state.compareAndSet(NONE, RUNNING)) {
240 return;
241 }
242 }
243 }
244 }
245 }
246
247 @Override
248 public boolean inEventLoop(Thread thread) {
249 return false;
250 }
251
252 @Override
253 public boolean isShuttingDown() {
254 return executor.isShutdown();
255 }
256
257 @Override
258 public Future<Void> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
259 return executor.shutdownGracefully(quietPeriod, timeout, unit);
260 }
261
262 @Override
263 public Future<Void> terminationFuture() {
264 return executor.terminationFuture();
265 }
266
267 @Override
268 public boolean isShutdown() {
269 return executor.isShutdown();
270 }
271
272 @Override
273 public boolean isTerminated() {
274 return executor.isTerminated();
275 }
276
277 @Override
278 public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
279 return executor.awaitTermination(timeout, unit);
280 }
281
282 @Override
283 public void execute(Runnable task) {
284 if (!tasks.offer(task)) {
285 throw new RejectedExecutionException();
286 }
287 if (state.compareAndSet(NONE, SUBMITTED)) {
288
289
290 executor.execute(this);
291 }
292 }
293
294 @Override
295 public Future<Void> schedule(Runnable task, long delay,
296 TimeUnit unit) {
297 throw new UnsupportedOperationException();
298 }
299
300 @Override
301 public <V> Future<V> schedule(Callable<V> task, long delay, TimeUnit unit) {
302 throw new UnsupportedOperationException();
303 }
304
305 @Override
306 public Future<Void> scheduleAtFixedRate(
307 Runnable task, long initialDelay, long period, TimeUnit unit) {
308 throw new UnsupportedOperationException();
309 }
310
311 @Override
312 public Future<Void> scheduleWithFixedDelay(
313 Runnable task, long initialDelay, long delay, TimeUnit unit) {
314 throw new UnsupportedOperationException();
315 }
316 }
317 }