1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty5.util.concurrent;
17
18 import io.netty5.util.internal.logging.InternalLogger;
19 import io.netty5.util.internal.logging.InternalLoggerFactory;
20
21 import java.util.concurrent.BlockingQueue;
22 import java.util.concurrent.Callable;
23 import java.util.concurrent.Delayed;
24 import java.util.concurrent.RejectedExecutionHandler;
25 import java.util.concurrent.RunnableScheduledFuture;
26 import java.util.concurrent.ScheduledThreadPoolExecutor;
27 import java.util.concurrent.ThreadFactory;
28 import java.util.concurrent.TimeUnit;
29
30 import static java.util.concurrent.TimeUnit.NANOSECONDS;
31
32
33
34
35
36
37
38
39 @SuppressWarnings("unchecked")
40 public final class UnorderedThreadPoolEventExecutor implements EventExecutor {
41 private static final InternalLogger logger = InternalLoggerFactory.getInstance(
42 UnorderedThreadPoolEventExecutor.class);
43
44 private final Promise<Void> terminationFuture = GlobalEventExecutor.INSTANCE.newPromise();
45 private final InnerScheduledThreadPoolExecutor executor;
46
47
48
49
50
51 public UnorderedThreadPoolEventExecutor(int corePoolSize) {
52 DefaultThreadFactory threadFactory = new DefaultThreadFactory(UnorderedThreadPoolEventExecutor.class);
53 executor = new InnerScheduledThreadPoolExecutor(this, corePoolSize, threadFactory);
54 }
55
56
57
58
59 public UnorderedThreadPoolEventExecutor(int corePoolSize, ThreadFactory threadFactory) {
60 executor = new InnerScheduledThreadPoolExecutor(this, corePoolSize, threadFactory);
61 }
62
63
64
65
66
67 public UnorderedThreadPoolEventExecutor(int corePoolSize, RejectedExecutionHandler handler) {
68 DefaultThreadFactory threadFactory = new DefaultThreadFactory(UnorderedThreadPoolEventExecutor.class);
69 executor = new InnerScheduledThreadPoolExecutor(this, corePoolSize, threadFactory, handler);
70 }
71
72
73
74
75 public UnorderedThreadPoolEventExecutor(int corePoolSize, ThreadFactory threadFactory,
76 RejectedExecutionHandler handler) {
77 executor = new InnerScheduledThreadPoolExecutor(this, corePoolSize, threadFactory, handler);
78 }
79
80 @Override
81 public boolean inEventLoop(Thread thread) {
82 return false;
83 }
84
85 @Override
86 public boolean isShuttingDown() {
87 return isShutdown();
88 }
89
90 @Override
91 public boolean isShutdown() {
92 return executor.isShutdown();
93 }
94
95 @Override
96 public boolean isTerminated() {
97 return executor.isTerminated();
98 }
99
100 @Override
101 public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
102 return executor.awaitTermination(timeout, unit);
103 }
104
105 @Override
106 public Future<Void> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
107
108
109 executor.shutdown();
110 return terminationFuture();
111 }
112
113 @Override
114 public Future<Void> terminationFuture() {
115 return terminationFuture.asFuture();
116 }
117
118 @Override
119 public Future<Void> schedule(Runnable task, long delay, TimeUnit unit) {
120 return (Future<Void>) executor.schedule(task, delay, unit);
121 }
122
123 @Override
124 public <V> Future<V> schedule(Callable<V> task, long delay, TimeUnit unit) {
125 return (Future<V>) executor.schedule(task, delay, unit);
126 }
127
128 @Override
129 public Future<Void> scheduleAtFixedRate(Runnable task, long initialDelay, long period, TimeUnit unit) {
130 return (Future<Void>) executor.scheduleAtFixedRate(task, initialDelay, period, unit);
131 }
132
133 @Override
134 public Future<Void> scheduleWithFixedDelay(Runnable task, long initialDelay, long delay, TimeUnit unit) {
135 return (Future<Void>) executor.scheduleWithFixedDelay(task, initialDelay, delay, unit);
136 }
137
138 @Override
139 public Future<Void> submit(Runnable task) {
140 return (Future<Void>) executor.submit(task);
141 }
142
143 @Override
144 public <T> Future<T> submit(Runnable task, T result) {
145 return (Future<T>) executor.submit(task, result);
146 }
147
148 @Override
149 public <T> Future<T> submit(Callable<T> task) {
150 return (Future<T>) executor.submit(task);
151 }
152
153 @Override
154 public void execute(Runnable task) {
155 executor.schedule(new NonNotifyRunnable(task), 0, NANOSECONDS);
156 }
157
158
159
160
161
162
163
164
165 BlockingQueue<Runnable> getQueue() {
166 return executor.getQueue();
167 }
168
169
170
171
172 private static final class RunnableScheduledFutureTask<V> extends PromiseTask<V>
173 implements RunnableScheduledFuture<V> {
174 private final RunnableScheduledFuture<V> future;
175
176 RunnableScheduledFutureTask(EventExecutor executor, Runnable runnable, RunnableScheduledFuture<V> future) {
177 super(executor, runnable, null);
178 this.future = future;
179 }
180
181 RunnableScheduledFutureTask(EventExecutor executor, Callable<V> callable, RunnableScheduledFuture<V> future) {
182 super(executor, callable);
183 this.future = future;
184 }
185
186 @Override
187 public void run() {
188 if (!isPeriodic()) {
189 super.run();
190 } else if (!isDone()) {
191 try {
192
193 future.run();
194 } catch (Throwable cause) {
195 if (!tryFailureInternal(cause)) {
196 logger.warn("Failure during execution of task", cause);
197 }
198 }
199 }
200 }
201
202 @Override
203 public boolean isPeriodic() {
204 return future.isPeriodic();
205 }
206
207 @Override
208 public long getDelay(TimeUnit unit) {
209 return future.getDelay(unit);
210 }
211
212 @Override
213 public int compareTo(Delayed o) {
214 return future.compareTo(o);
215 }
216 }
217
218
219
220
221
222
223
224
225 private static final class NonNotifyRunnable implements Runnable {
226
227 private final Runnable task;
228
229 NonNotifyRunnable(Runnable task) {
230 this.task = task;
231 }
232
233 @Override
234 public void run() {
235 task.run();
236 }
237 }
238
239 private static final class InnerScheduledThreadPoolExecutor extends ScheduledThreadPoolExecutor {
240 private final EventExecutor eventExecutor;
241
242 InnerScheduledThreadPoolExecutor(EventExecutor eventExecutor, int corePoolSize, ThreadFactory threadFactory) {
243 super(corePoolSize, threadFactory);
244 this.eventExecutor = eventExecutor;
245 }
246
247 InnerScheduledThreadPoolExecutor(EventExecutor eventExecutor, int corePoolSize, ThreadFactory threadFactory,
248 RejectedExecutionHandler handler) {
249 super(corePoolSize, threadFactory, handler);
250 this.eventExecutor = eventExecutor;
251 }
252
253 @Override
254 protected <V> RunnableScheduledFuture<V> decorateTask(Runnable runnable, RunnableScheduledFuture<V> task) {
255 return runnable instanceof NonNotifyRunnable ?
256 task : new RunnableScheduledFutureTask<>(eventExecutor, runnable, task);
257 }
258
259 @Override
260 protected <V> RunnableScheduledFuture<V> decorateTask(Callable<V> callable, RunnableScheduledFuture<V> task) {
261 return new RunnableScheduledFutureTask<>(eventExecutor, callable, task);
262 }
263 }
264 }