1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.util.concurrent;
17
18 import io.netty.util.internal.logging.InternalLogger;
19 import io.netty.util.internal.logging.InternalLoggerFactory;
20
21 import java.util.Collections;
22 import java.util.Iterator;
23 import java.util.List;
24 import java.util.Set;
25 import java.util.concurrent.Callable;
26 import java.util.concurrent.Delayed;
27 import java.util.concurrent.RejectedExecutionHandler;
28 import java.util.concurrent.RunnableScheduledFuture;
29 import java.util.concurrent.ScheduledThreadPoolExecutor;
30 import java.util.concurrent.ThreadFactory;
31 import java.util.concurrent.TimeUnit;
32
33 import static java.util.concurrent.TimeUnit.NANOSECONDS;
34
35
36
37
38
39
40
41
42 public final class UnorderedThreadPoolEventExecutor extends ScheduledThreadPoolExecutor implements EventExecutor {
43 private static final InternalLogger logger = InternalLoggerFactory.getInstance(
44 UnorderedThreadPoolEventExecutor.class);
45
46 private final Promise<?> terminationFuture = GlobalEventExecutor.INSTANCE.newPromise();
47 private final Set<EventExecutor> executorSet = Collections.singleton((EventExecutor) this);
48
49
50
51
52
53 public UnorderedThreadPoolEventExecutor(int corePoolSize) {
54 this(corePoolSize, new DefaultThreadFactory(UnorderedThreadPoolEventExecutor.class));
55 }
56
57
58
59
60 public UnorderedThreadPoolEventExecutor(int corePoolSize, ThreadFactory threadFactory) {
61 super(corePoolSize, threadFactory);
62 }
63
64
65
66
67
68 public UnorderedThreadPoolEventExecutor(int corePoolSize, RejectedExecutionHandler handler) {
69 this(corePoolSize, new DefaultThreadFactory(UnorderedThreadPoolEventExecutor.class), handler);
70 }
71
72
73
74
75 public UnorderedThreadPoolEventExecutor(int corePoolSize, ThreadFactory threadFactory,
76 RejectedExecutionHandler handler) {
77 super(corePoolSize, threadFactory, handler);
78 }
79
80 @Override
81 public EventExecutor next() {
82 return this;
83 }
84
85 @Override
86 public EventExecutorGroup parent() {
87 return this;
88 }
89
90 @Override
91 public boolean inEventLoop() {
92 return false;
93 }
94
95 @Override
96 public boolean inEventLoop(Thread thread) {
97 return false;
98 }
99
100 @Override
101 public <V> Promise<V> newPromise() {
102 return new DefaultPromise<V>(this);
103 }
104
105 @Override
106 public <V> ProgressivePromise<V> newProgressivePromise() {
107 return new DefaultProgressivePromise<V>(this);
108 }
109
110 @Override
111 public <V> Future<V> newSucceededFuture(V result) {
112 return new SucceededFuture<V>(this, result);
113 }
114
115 @Override
116 public <V> Future<V> newFailedFuture(Throwable cause) {
117 return new FailedFuture<V>(this, cause);
118 }
119
120 @Override
121 public boolean isShuttingDown() {
122 return isShutdown();
123 }
124
125 @Override
126 public List<Runnable> shutdownNow() {
127 List<Runnable> tasks = super.shutdownNow();
128 terminationFuture.trySuccess(null);
129 return tasks;
130 }
131
132 @Override
133 public void shutdown() {
134 super.shutdown();
135 terminationFuture.trySuccess(null);
136 }
137
138 @Override
139 public Future<?> shutdownGracefully() {
140 return shutdownGracefully(2, 15, TimeUnit.SECONDS);
141 }
142
143 @Override
144 public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
145
146
147 shutdown();
148 return terminationFuture();
149 }
150
151 @Override
152 public Future<?> terminationFuture() {
153 return terminationFuture;
154 }
155
156 @Override
157 public Iterator<EventExecutor> iterator() {
158 return executorSet.iterator();
159 }
160
161 @Override
162 protected <V> RunnableScheduledFuture<V> decorateTask(Runnable runnable, RunnableScheduledFuture<V> task) {
163 return runnable instanceof NonNotifyRunnable ?
164 task : new RunnableScheduledFutureTask<V>(this, runnable, task);
165 }
166
167 @Override
168 protected <V> RunnableScheduledFuture<V> decorateTask(Callable<V> callable, RunnableScheduledFuture<V> task) {
169 return new RunnableScheduledFutureTask<V>(this, callable, task);
170 }
171
172 @Override
173 public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
174 return (ScheduledFuture<?>) super.schedule(command, delay, unit);
175 }
176
177 @Override
178 public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
179 return (ScheduledFuture<V>) super.schedule(callable, delay, unit);
180 }
181
182 @Override
183 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
184 return (ScheduledFuture<?>) super.scheduleAtFixedRate(command, initialDelay, period, unit);
185 }
186
187 @Override
188 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
189 return (ScheduledFuture<?>) super.scheduleWithFixedDelay(command, initialDelay, delay, unit);
190 }
191
192 @Override
193 public Future<?> submit(Runnable task) {
194 return (Future<?>) super.submit(task);
195 }
196
197 @Override
198 public <T> Future<T> submit(Runnable task, T result) {
199 return (Future<T>) super.submit(task, result);
200 }
201
202 @Override
203 public <T> Future<T> submit(Callable<T> task) {
204 return (Future<T>) super.submit(task);
205 }
206
207 @Override
208 public void execute(Runnable command) {
209 super.schedule(new NonNotifyRunnable(command), 0, NANOSECONDS);
210 }
211
212 private static final class RunnableScheduledFutureTask<V> extends PromiseTask<V>
213 implements RunnableScheduledFuture<V>, ScheduledFuture<V> {
214 private final RunnableScheduledFuture<V> future;
215
216 RunnableScheduledFutureTask(EventExecutor executor, Runnable runnable,
217 RunnableScheduledFuture<V> future) {
218 super(executor, runnable, null);
219 this.future = future;
220 }
221
222 RunnableScheduledFutureTask(EventExecutor executor, Callable<V> callable,
223 RunnableScheduledFuture<V> future) {
224 super(executor, callable);
225 this.future = future;
226 }
227
228 @Override
229 public void run() {
230 if (!isPeriodic()) {
231 super.run();
232 } else if (!isDone()) {
233 try {
234
235 task.call();
236 } catch (Throwable cause) {
237 if (!tryFailureInternal(cause)) {
238 logger.warn("Failure during execution of task", cause);
239 }
240 }
241 }
242 }
243
244 @Override
245 public boolean isPeriodic() {
246 return future.isPeriodic();
247 }
248
249 @Override
250 public long getDelay(TimeUnit unit) {
251 return future.getDelay(unit);
252 }
253
254 @Override
255 public int compareTo(Delayed o) {
256 return future.compareTo(o);
257 }
258 }
259
260
261
262
263
264
265
266
267 private static final class NonNotifyRunnable implements Runnable {
268
269 private final Runnable task;
270
271 NonNotifyRunnable(Runnable task) {
272 this.task = task;
273 }
274
275 @Override
276 public void run() {
277 task.run();
278 }
279 }
280 }