1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.util.concurrent;
17
18 import io.netty.util.internal.logging.InternalLogger;
19 import io.netty.util.internal.logging.InternalLoggerFactory;
20
21 import java.security.AccessController;
22 import java.security.PrivilegedAction;
23 import java.util.Queue;
24 import java.util.concurrent.BlockingQueue;
25 import java.util.concurrent.Executors;
26 import java.util.concurrent.LinkedBlockingQueue;
27 import java.util.concurrent.RejectedExecutionException;
28 import java.util.concurrent.ThreadFactory;
29 import java.util.concurrent.TimeUnit;
30 import java.util.concurrent.atomic.AtomicBoolean;
31
32
33
34
35
36
37 public final class GlobalEventExecutor extends AbstractScheduledEventExecutor {
38
39 private static final InternalLogger logger = InternalLoggerFactory.getInstance(GlobalEventExecutor.class);
40
41 private static final long SCHEDULE_QUIET_PERIOD_INTERVAL = TimeUnit.SECONDS.toNanos(1);
42
43 public static final GlobalEventExecutor INSTANCE = new GlobalEventExecutor();
44
45 final BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<Runnable>();
46 final ScheduledFutureTask<Void> quietPeriodTask = new ScheduledFutureTask<Void>(
47 this, Executors.<Void>callable(new Runnable() {
48 @Override
49 public void run() {
50
51 }
52 }, null), ScheduledFutureTask.deadlineNanos(SCHEDULE_QUIET_PERIOD_INTERVAL), -SCHEDULE_QUIET_PERIOD_INTERVAL);
53
54
55
56
57
58 final ThreadFactory threadFactory =
59 new DefaultThreadFactory(DefaultThreadFactory.toPoolName(getClass()), false, Thread.NORM_PRIORITY, null);
60 private final TaskRunner taskRunner = new TaskRunner();
61 private final AtomicBoolean started = new AtomicBoolean();
62 volatile Thread thread;
63
64 private final Future<?> terminationFuture = new FailedFuture<Object>(this, new UnsupportedOperationException());
65
66 private GlobalEventExecutor() {
67 scheduledTaskQueue().add(quietPeriodTask);
68 }
69
70 @Override
71 public EventExecutorGroup parent() {
72 return null;
73 }
74
75
76
77
78
79
80 Runnable takeTask() {
81 BlockingQueue<Runnable> taskQueue = this.taskQueue;
82 for (;;) {
83 ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
84 if (scheduledTask == null) {
85 Runnable task = null;
86 try {
87 task = taskQueue.take();
88 } catch (InterruptedException e) {
89
90 }
91 return task;
92 } else {
93 long delayNanos = scheduledTask.delayNanos();
94 Runnable task;
95 if (delayNanos > 0) {
96 try {
97 task = taskQueue.poll(delayNanos, TimeUnit.NANOSECONDS);
98 } catch (InterruptedException e) {
99 return null;
100 }
101 } else {
102 task = taskQueue.poll();
103 }
104
105 if (task == null) {
106 fetchFromScheduledTaskQueue();
107 task = taskQueue.poll();
108 }
109
110 if (task != null) {
111 return task;
112 }
113 }
114 }
115 }
116
117 private void fetchFromScheduledTaskQueue() {
118 long nanoTime = AbstractScheduledEventExecutor.nanoTime();
119 Runnable scheduledTask = pollScheduledTask(nanoTime);
120 while (scheduledTask != null) {
121 taskQueue.add(scheduledTask);
122 scheduledTask = pollScheduledTask(nanoTime);
123 }
124 }
125
126
127
128
129
130
131
132 public int pendingTasks() {
133 return taskQueue.size();
134 }
135
136
137
138
139
140 private void addTask(Runnable task) {
141 if (task == null) {
142 throw new NullPointerException("task");
143 }
144 taskQueue.add(task);
145 }
146
147 @Override
148 public boolean inEventLoop(Thread thread) {
149 return thread == this.thread;
150 }
151
152 @Override
153 public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
154 return terminationFuture();
155 }
156
157 @Override
158 public Future<?> terminationFuture() {
159 return terminationFuture;
160 }
161
162 @Override
163 @Deprecated
164 public void shutdown() {
165 throw new UnsupportedOperationException();
166 }
167
168 @Override
169 public boolean isShuttingDown() {
170 return false;
171 }
172
173 @Override
174 public boolean isShutdown() {
175 return false;
176 }
177
178 @Override
179 public boolean isTerminated() {
180 return false;
181 }
182
183 @Override
184 public boolean awaitTermination(long timeout, TimeUnit unit) {
185 return false;
186 }
187
188
189
190
191
192
193
194
195
196 public boolean awaitInactivity(long timeout, TimeUnit unit) throws InterruptedException {
197 if (unit == null) {
198 throw new NullPointerException("unit");
199 }
200
201 final Thread thread = this.thread;
202 if (thread == null) {
203 throw new IllegalStateException("thread was not started");
204 }
205 thread.join(unit.toMillis(timeout));
206 return !thread.isAlive();
207 }
208
209 @Override
210 public void execute(Runnable task) {
211 if (task == null) {
212 throw new NullPointerException("task");
213 }
214
215 addTask(task);
216 if (!inEventLoop()) {
217 startThread();
218 }
219 }
220
221 private void startThread() {
222 if (started.compareAndSet(false, true)) {
223 final Thread t = threadFactory.newThread(taskRunner);
224
225
226
227
228
229 AccessController.doPrivileged(new PrivilegedAction<Void>() {
230 @Override
231 public Void run() {
232 t.setContextClassLoader(null);
233 return null;
234 }
235 });
236
237
238
239
240 thread = t;
241 t.start();
242 }
243 }
244
245 final class TaskRunner implements Runnable {
246 @Override
247 public void run() {
248 for (;;) {
249 Runnable task = takeTask();
250 if (task != null) {
251 try {
252 task.run();
253 } catch (Throwable t) {
254 logger.warn("Unexpected exception from the global event executor: ", t);
255 }
256
257 if (task != quietPeriodTask) {
258 continue;
259 }
260 }
261
262 Queue<ScheduledFutureTask<?>> scheduledTaskQueue = GlobalEventExecutor.this.scheduledTaskQueue;
263
264 if (taskQueue.isEmpty() && (scheduledTaskQueue == null || scheduledTaskQueue.size() == 1)) {
265
266
267
268 boolean stopped = started.compareAndSet(true, false);
269 assert stopped;
270
271
272 if (taskQueue.isEmpty() && (scheduledTaskQueue == null || scheduledTaskQueue.size() == 1)) {
273
274
275
276
277 break;
278 }
279
280
281 if (!started.compareAndSet(false, true)) {
282
283
284 break;
285 }
286
287
288
289
290 }
291 }
292 }
293 }
294 }