1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty5.util.concurrent;
17
18 import io.netty5.util.internal.ThreadExecutorMap;
19 import io.netty5.util.internal.logging.InternalLogger;
20 import io.netty5.util.internal.logging.InternalLoggerFactory;
21 import org.jetbrains.annotations.Async.Execute;
22 import org.jetbrains.annotations.Async.Schedule;
23
24 import java.security.AccessController;
25 import java.security.PrivilegedAction;
26 import java.util.Queue;
27 import java.util.concurrent.BlockingQueue;
28 import java.util.concurrent.Executors;
29 import java.util.concurrent.LinkedBlockingQueue;
30 import java.util.concurrent.RejectedExecutionException;
31 import java.util.concurrent.ThreadFactory;
32 import java.util.concurrent.TimeUnit;
33 import java.util.concurrent.atomic.AtomicBoolean;
34
35 import static java.util.Objects.requireNonNull;
36
37
38
39
40
41
42 public final class GlobalEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
43
44 private static final InternalLogger logger = InternalLoggerFactory.getInstance(GlobalEventExecutor.class);
45
46 private static final long SCHEDULE_QUIET_PERIOD_INTERVAL = TimeUnit.SECONDS.toNanos(1);
47
48 private final RunnableScheduledFutureAdapter<Void> quietPeriodTask;
49 public static final GlobalEventExecutor INSTANCE;
50
51 static {
52 INSTANCE = new GlobalEventExecutor();
53 }
54
55 private final BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>();
56
57
58
59
60
61 final ThreadFactory threadFactory;
62 private final TaskRunner taskRunner = new TaskRunner();
63 private final AtomicBoolean started = new AtomicBoolean();
64 volatile Thread thread;
65
66 private final Future<Void> terminationFuture = DefaultPromise.<Void>newFailedPromise(
67 this, new UnsupportedOperationException()).asFuture();
68
69 private GlobalEventExecutor() {
70 threadFactory = ThreadExecutorMap.apply(new DefaultThreadFactory(
71 DefaultThreadFactory.toPoolName(getClass()), false, Thread.NORM_PRIORITY, null), this);
72 quietPeriodTask = new RunnableScheduledFutureAdapter<>(
73 this, newPromise(), Executors.callable(() -> {
74
75 }, null),
76
77
78 deadlineNanos(getCurrentTimeNanos(), SCHEDULE_QUIET_PERIOD_INTERVAL),
79 -SCHEDULE_QUIET_PERIOD_INTERVAL);
80
81 scheduledTaskQueue().add(quietPeriodTask);
82 }
83
84
85
86
87
88
89 private Runnable takeTask() {
90 BlockingQueue<Runnable> taskQueue = this.taskQueue;
91 for (;;) {
92 RunnableScheduledFuture<?> scheduledTask = peekScheduledTask();
93 if (scheduledTask == null) {
94 Runnable task = null;
95 try {
96 task = taskQueue.take();
97 } catch (InterruptedException e) {
98
99 }
100 return task;
101 } else {
102 long delayNanos = scheduledTask.delayNanos();
103 Runnable task = null;
104 if (delayNanos > 0) {
105 try {
106 task = taskQueue.poll(delayNanos, TimeUnit.NANOSECONDS);
107 } catch (InterruptedException e) {
108
109 return null;
110 }
111 }
112 if (task == null) {
113
114
115
116
117 fetchFromScheduledTaskQueue();
118 task = taskQueue.poll();
119 }
120
121 if (task != null) {
122 return task;
123 }
124 }
125 }
126 }
127
128 private void fetchFromScheduledTaskQueue() {
129 long nanoTime = getCurrentTimeNanos();
130 Runnable scheduledTask = pollScheduledTask(nanoTime);
131 while (scheduledTask != null) {
132 taskQueue.add(scheduledTask);
133 scheduledTask = pollScheduledTask(nanoTime);
134 }
135 }
136
137
138
139
140 public int pendingTasks() {
141 return taskQueue.size();
142 }
143
144
145
146
147
148 private void addTask(Runnable task) {
149 requireNonNull(task, "task");
150 taskQueue.add(task);
151 }
152
153 @Override
154 public boolean inEventLoop(Thread thread) {
155 return thread == this.thread;
156 }
157
158 @Override
159 public Future<Void> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
160 return terminationFuture();
161 }
162
163 @Override
164 public Future<Void> terminationFuture() {
165 return terminationFuture;
166 }
167
168 @Override
169 public boolean isShuttingDown() {
170 return false;
171 }
172
173 @Override
174 public boolean isShutdown() {
175 return false;
176 }
177
178 @Override
179 public boolean isTerminated() {
180 return false;
181 }
182
183 @Override
184 public boolean awaitTermination(long timeout, TimeUnit unit) {
185 return false;
186 }
187
188
189
190
191
192
193
194
195
196 public boolean awaitInactivity(long timeout, TimeUnit unit) throws InterruptedException {
197 requireNonNull(unit, "unit");
198
199 final Thread thread = this.thread;
200 if (thread == null) {
201 throw new IllegalStateException("thread was not started");
202 }
203 thread.join(unit.toMillis(timeout));
204 return !thread.isAlive();
205 }
206
207 @Override
208 public void execute(@Schedule Runnable task) {
209 requireNonNull(task, "task");
210
211 addTask(task);
212 if (!inEventLoop()) {
213 startThread();
214 }
215 }
216
217 private void startThread() {
218 if (started.compareAndSet(false, true)) {
219 final Thread t = threadFactory.newThread(taskRunner);
220
221
222
223
224
225 AccessController.doPrivileged((PrivilegedAction<Void>) () -> {
226 t.setContextClassLoader(null);
227 return null;
228 });
229
230
231
232
233 thread = t;
234 t.start();
235 }
236 }
237
238 final class TaskRunner implements Runnable {
239 @Override
240 public void run() {
241 for (;;) {
242 Runnable task = takeTask();
243 if (task != null) {
244 try {
245 runTask(task);
246 } catch (Throwable t) {
247 logger.warn("Unexpected exception from the global event executor: ", t);
248 }
249
250 if (task != quietPeriodTask) {
251 continue;
252 }
253 }
254
255 Queue<RunnableScheduledFutureNode<?>> scheduledTaskQueue = scheduledTaskQueue();
256
257 if (taskQueue.isEmpty() && scheduledTaskQueue.size() <= 1) {
258
259
260
261 boolean stopped = started.compareAndSet(true, false);
262 assert stopped;
263
264
265
266 if (taskQueue.isEmpty()) {
267
268
269
270
271 break;
272 }
273
274
275 if (!started.compareAndSet(false, true)) {
276
277
278 break;
279 }
280
281
282
283
284 }
285 }
286 }
287
288 private void runTask(@Execute Runnable task) {
289 task.run();
290 }
291 }
292 }