1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.util.concurrent;
17
18 import io.netty.util.internal.ObjectUtil;
19 import io.netty.util.internal.ThreadExecutorMap;
20 import io.netty.util.internal.logging.InternalLogger;
21 import io.netty.util.internal.logging.InternalLoggerFactory;
22
23 import org.jetbrains.annotations.Async.Schedule;
24
25 import java.security.AccessController;
26 import java.security.PrivilegedAction;
27 import java.util.Queue;
28 import java.util.concurrent.BlockingQueue;
29 import java.util.concurrent.Executors;
30 import java.util.concurrent.LinkedBlockingQueue;
31 import java.util.concurrent.RejectedExecutionException;
32 import java.util.concurrent.ThreadFactory;
33 import java.util.concurrent.TimeUnit;
34 import java.util.concurrent.atomic.AtomicBoolean;
35
36
37
38
39
40
41 public final class GlobalEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
42
43 private static final InternalLogger logger = InternalLoggerFactory.getInstance(GlobalEventExecutor.class);
44
45 private static final long SCHEDULE_QUIET_PERIOD_INTERVAL = TimeUnit.SECONDS.toNanos(1);
46
47 public static final GlobalEventExecutor INSTANCE = new GlobalEventExecutor();
48
49 final BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<Runnable>();
50 final ScheduledFutureTask<Void> quietPeriodTask = new ScheduledFutureTask<Void>(
51 this, Executors.<Void>callable(new Runnable() {
52 @Override
53 public void run() {
54
55 }
56 }, null),
57
58
59 deadlineNanos(getCurrentTimeNanos(), SCHEDULE_QUIET_PERIOD_INTERVAL),
60 -SCHEDULE_QUIET_PERIOD_INTERVAL
61 );
62
63
64
65
66
67 final ThreadFactory threadFactory;
68 private final TaskRunner taskRunner = new TaskRunner();
69 private final AtomicBoolean started = new AtomicBoolean();
70 volatile Thread thread;
71
72 private final Future<?> terminationFuture = new FailedFuture<Object>(this, new UnsupportedOperationException());
73
74 private GlobalEventExecutor() {
75 scheduledTaskQueue().add(quietPeriodTask);
76 threadFactory = ThreadExecutorMap.apply(new DefaultThreadFactory(
77 DefaultThreadFactory.toPoolName(getClass()), false, Thread.NORM_PRIORITY, null), this);
78 }
79
80
81
82
83
84
85 Runnable takeTask() {
86 BlockingQueue<Runnable> taskQueue = this.taskQueue;
87 for (;;) {
88 ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
89 if (scheduledTask == null) {
90 Runnable task = null;
91 try {
92 task = taskQueue.take();
93 } catch (InterruptedException e) {
94
95 }
96 return task;
97 } else {
98 long delayNanos = scheduledTask.delayNanos();
99 Runnable task = null;
100 if (delayNanos > 0) {
101 try {
102 task = taskQueue.poll(delayNanos, TimeUnit.NANOSECONDS);
103 } catch (InterruptedException e) {
104
105 return null;
106 }
107 }
108 if (task == null) {
109
110
111
112
113 fetchFromScheduledTaskQueue();
114 task = taskQueue.poll();
115 }
116
117 if (task != null) {
118 return task;
119 }
120 }
121 }
122 }
123
124 private void fetchFromScheduledTaskQueue() {
125 long nanoTime = getCurrentTimeNanos();
126 Runnable scheduledTask = pollScheduledTask(nanoTime);
127 while (scheduledTask != null) {
128 taskQueue.add(scheduledTask);
129 scheduledTask = pollScheduledTask(nanoTime);
130 }
131 }
132
133
134
135
136 public int pendingTasks() {
137 return taskQueue.size();
138 }
139
140
141
142
143
144 private void addTask(Runnable task) {
145 taskQueue.add(ObjectUtil.checkNotNull(task, "task"));
146 }
147
148 @Override
149 public boolean inEventLoop(Thread thread) {
150 return thread == this.thread;
151 }
152
153 @Override
154 public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
155 return terminationFuture();
156 }
157
158 @Override
159 public Future<?> terminationFuture() {
160 return terminationFuture;
161 }
162
163 @Override
164 @Deprecated
165 public void shutdown() {
166 throw new UnsupportedOperationException();
167 }
168
169 @Override
170 public boolean isShuttingDown() {
171 return false;
172 }
173
174 @Override
175 public boolean isShutdown() {
176 return false;
177 }
178
179 @Override
180 public boolean isTerminated() {
181 return false;
182 }
183
184 @Override
185 public boolean awaitTermination(long timeout, TimeUnit unit) {
186 return false;
187 }
188
189
190
191
192
193
194
195
196
197 public boolean awaitInactivity(long timeout, TimeUnit unit) throws InterruptedException {
198 ObjectUtil.checkNotNull(unit, "unit");
199
200 final Thread thread = this.thread;
201 if (thread == null) {
202 throw new IllegalStateException("thread was not started");
203 }
204 thread.join(unit.toMillis(timeout));
205 return !thread.isAlive();
206 }
207
208 @Override
209 public void execute(Runnable task) {
210 execute0(task);
211 }
212
213 private void execute0(@Schedule Runnable task) {
214 addTask(ObjectUtil.checkNotNull(task, "task"));
215 if (!inEventLoop()) {
216 startThread();
217 }
218 }
219
220 private void startThread() {
221 if (started.compareAndSet(false, true)) {
222 final Thread t = threadFactory.newThread(taskRunner);
223
224
225
226
227
228 AccessController.doPrivileged(new PrivilegedAction<Void>() {
229 @Override
230 public Void run() {
231 t.setContextClassLoader(null);
232 return null;
233 }
234 });
235
236
237
238
239 thread = t;
240 t.start();
241 }
242 }
243
244 final class TaskRunner implements Runnable {
245 @Override
246 public void run() {
247 for (;;) {
248 Runnable task = takeTask();
249 if (task != null) {
250 try {
251 runTask(task);
252 } catch (Throwable t) {
253 logger.warn("Unexpected exception from the global event executor: ", t);
254 }
255
256 if (task != quietPeriodTask) {
257 continue;
258 }
259 }
260
261 Queue<ScheduledFutureTask<?>> scheduledTaskQueue = GlobalEventExecutor.this.scheduledTaskQueue;
262
263 if (taskQueue.isEmpty() && (scheduledTaskQueue == null || scheduledTaskQueue.size() == 1)) {
264
265
266
267 boolean stopped = started.compareAndSet(true, false);
268 assert stopped;
269
270
271
272
273 if (taskQueue.isEmpty()) {
274
275
276
277
278 break;
279 }
280
281
282 if (!started.compareAndSet(false, true)) {
283
284
285 break;
286 }
287
288
289
290
291 }
292 }
293 }
294 }
295 }