1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.util.concurrent;
17
18 import io.netty.util.internal.logging.InternalLogger;
19 import io.netty.util.internal.logging.InternalLoggerFactory;
20 import org.jetbrains.annotations.NotNull;
21
22 import java.util.Collections;
23 import java.util.Iterator;
24 import java.util.List;
25 import java.util.Set;
26 import java.util.concurrent.Callable;
27 import java.util.concurrent.ConcurrentHashMap;
28 import java.util.concurrent.Delayed;
29 import java.util.concurrent.ExecutionException;
30 import java.util.concurrent.RejectedExecutionHandler;
31 import java.util.concurrent.RunnableScheduledFuture;
32 import java.util.concurrent.ScheduledThreadPoolExecutor;
33 import java.util.concurrent.ThreadFactory;
34 import java.util.concurrent.TimeUnit;
35
36 import static java.util.concurrent.TimeUnit.NANOSECONDS;
37
38
39
40
41
42
43
44
45
46
47
48
49
50 @Deprecated
51 public final class UnorderedThreadPoolEventExecutor extends ScheduledThreadPoolExecutor implements EventExecutor {
52 private static final InternalLogger logger = InternalLoggerFactory.getInstance(
53 UnorderedThreadPoolEventExecutor.class);
54
55 private final Promise<?> terminationFuture = GlobalEventExecutor.INSTANCE.newPromise();
56 private final Set<EventExecutor> executorSet = Collections.singleton(this);
57 private final Set<Thread> eventLoopThreads = ConcurrentHashMap.newKeySet();
58
59
60
61
62
63 public UnorderedThreadPoolEventExecutor(int corePoolSize) {
64 this(corePoolSize, new DefaultThreadFactory(UnorderedThreadPoolEventExecutor.class));
65 }
66
67
68
69
70 public UnorderedThreadPoolEventExecutor(int corePoolSize, ThreadFactory threadFactory) {
71 super(corePoolSize, threadFactory);
72 setThreadFactory(new AccountingThreadFactory(threadFactory, eventLoopThreads));
73 }
74
75
76
77
78
79 public UnorderedThreadPoolEventExecutor(int corePoolSize, RejectedExecutionHandler handler) {
80 this(corePoolSize, new DefaultThreadFactory(UnorderedThreadPoolEventExecutor.class), handler);
81 }
82
83
84
85
86 public UnorderedThreadPoolEventExecutor(int corePoolSize, ThreadFactory threadFactory,
87 RejectedExecutionHandler handler) {
88 super(corePoolSize, threadFactory, handler);
89 setThreadFactory(new AccountingThreadFactory(threadFactory, eventLoopThreads));
90 }
91
92 @Override
93 public EventExecutor next() {
94 return this;
95 }
96
97 @Override
98 public EventExecutorGroup parent() {
99 return this;
100 }
101
102 @Override
103 public boolean inEventLoop() {
104 return inEventLoop(Thread.currentThread());
105 }
106
107 @Override
108 public boolean inEventLoop(Thread thread) {
109 return eventLoopThreads.contains(thread);
110 }
111
112 @Override
113 public <V> Promise<V> newPromise() {
114 return new DefaultPromise<V>(this);
115 }
116
117 @Override
118 public <V> ProgressivePromise<V> newProgressivePromise() {
119 return new DefaultProgressivePromise<V>(this);
120 }
121
122 @Override
123 public <V> Future<V> newSucceededFuture(V result) {
124 return new SucceededFuture<V>(this, result);
125 }
126
127 @Override
128 public <V> Future<V> newFailedFuture(Throwable cause) {
129 return new FailedFuture<V>(this, cause);
130 }
131
132 @Override
133 public boolean isShuttingDown() {
134 return isShutdown();
135 }
136
137 @Override
138 public List<Runnable> shutdownNow() {
139 List<Runnable> tasks = super.shutdownNow();
140 terminationFuture.trySuccess(null);
141 return tasks;
142 }
143
144 @Override
145 public void shutdown() {
146 super.shutdown();
147 terminationFuture.trySuccess(null);
148 }
149
150 @Override
151 public Future<?> shutdownGracefully() {
152 return shutdownGracefully(2, 15, TimeUnit.SECONDS);
153 }
154
155 @Override
156 public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
157
158
159 shutdown();
160 return terminationFuture();
161 }
162
163 @Override
164 public Future<?> terminationFuture() {
165 return terminationFuture;
166 }
167
168 @Override
169 public Iterator<EventExecutor> iterator() {
170 return executorSet.iterator();
171 }
172
173 @Override
174 protected <V> RunnableScheduledFuture<V> decorateTask(Runnable runnable, RunnableScheduledFuture<V> task) {
175 return runnable instanceof NonNotifyRunnable ?
176 task : new RunnableScheduledFutureTask<V>(this, task, false);
177 }
178
179 @Override
180 protected <V> RunnableScheduledFuture<V> decorateTask(Callable<V> callable, RunnableScheduledFuture<V> task) {
181 return new RunnableScheduledFutureTask<V>(this, task, true);
182 }
183
184 @Override
185 public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
186 return (ScheduledFuture<?>) super.schedule(command, delay, unit);
187 }
188
189 @Override
190 public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
191 return (ScheduledFuture<V>) super.schedule(callable, delay, unit);
192 }
193
194 @Override
195 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
196 return (ScheduledFuture<?>) super.scheduleAtFixedRate(command, initialDelay, period, unit);
197 }
198
199 @Override
200 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
201 return (ScheduledFuture<?>) super.scheduleWithFixedDelay(command, initialDelay, delay, unit);
202 }
203
204 @Override
205 public Future<?> submit(Runnable task) {
206 return (Future<?>) super.submit(task);
207 }
208
209 @Override
210 public <T> Future<T> submit(Runnable task, T result) {
211 return (Future<T>) super.submit(task, result);
212 }
213
214 @Override
215 public <T> Future<T> submit(Callable<T> task) {
216 return (Future<T>) super.submit(task);
217 }
218
219 @Override
220 public void execute(Runnable command) {
221 super.schedule(new NonNotifyRunnable(command), 0, NANOSECONDS);
222 }
223
224 private static final class RunnableScheduledFutureTask<V> extends PromiseTask<V>
225 implements RunnableScheduledFuture<V>, ScheduledFuture<V> {
226 private final RunnableScheduledFuture<V> future;
227 private final boolean wasCallable;
228
229 RunnableScheduledFutureTask(EventExecutor executor, RunnableScheduledFuture<V> future, boolean wasCallable) {
230 super(executor, future);
231 this.future = future;
232 this.wasCallable = wasCallable;
233 }
234
235 @Override
236 V runTask() throws Throwable {
237 V result = super.runTask();
238 if (result == null && wasCallable) {
239
240
241
242
243 assert future.isDone();
244 try {
245 return future.get();
246 } catch (ExecutionException e) {
247
248 throw e.getCause();
249 }
250 }
251 return result;
252 }
253
254 @Override
255 public void run() {
256 if (!isPeriodic()) {
257 super.run();
258 } else if (!isDone()) {
259 try {
260
261 runTask();
262 } catch (Throwable cause) {
263 if (!tryFailureInternal(cause)) {
264 logger.warn("Failure during execution of task", cause);
265 }
266 }
267 }
268 }
269
270 @Override
271 public boolean isPeriodic() {
272 return future.isPeriodic();
273 }
274
275 @Override
276 public long getDelay(TimeUnit unit) {
277 return future.getDelay(unit);
278 }
279
280 @Override
281 public int compareTo(Delayed o) {
282 return future.compareTo(o);
283 }
284 }
285
286
287
288
289
290
291
292
293 private static final class NonNotifyRunnable implements Runnable {
294
295 private final Runnable task;
296
297 NonNotifyRunnable(Runnable task) {
298 this.task = task;
299 }
300
301 @Override
302 public void run() {
303 task.run();
304 }
305 }
306
307 private static final class AccountingThreadFactory implements ThreadFactory {
308 private final ThreadFactory delegate;
309 private final Set<Thread> threads;
310
311 private AccountingThreadFactory(ThreadFactory delegate, Set<Thread> threads) {
312 this.delegate = delegate;
313 this.threads = threads;
314 }
315
316 @Override
317 public Thread newThread(@NotNull Runnable r) {
318 return delegate.newThread(() -> {
319 threads.add(Thread.currentThread());
320 try {
321 r.run();
322 } finally {
323 threads.remove(Thread.currentThread());
324 }
325 });
326 }
327 }
328 }