1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.util.concurrent;
17
18 import io.netty.util.internal.ObjectUtil;
19 import io.netty.util.internal.SystemPropertyUtil;
20 import io.netty.util.internal.ThreadExecutorMap;
21 import io.netty.util.internal.ThrowableUtil;
22 import io.netty.util.internal.logging.InternalLogger;
23 import io.netty.util.internal.logging.InternalLoggerFactory;
24
25 import org.jetbrains.annotations.Async.Schedule;
26
27 import java.security.AccessController;
28 import java.security.PrivilegedAction;
29 import java.util.Queue;
30 import java.util.concurrent.BlockingQueue;
31 import java.util.concurrent.Executors;
32 import java.util.concurrent.LinkedBlockingQueue;
33 import java.util.concurrent.RejectedExecutionException;
34 import java.util.concurrent.ThreadFactory;
35 import java.util.concurrent.TimeUnit;
36 import java.util.concurrent.atomic.AtomicBoolean;
37
38
39
40
41
42
43
44 public final class GlobalEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
45 private static final InternalLogger logger = InternalLoggerFactory.getInstance(GlobalEventExecutor.class);
46
47 private static final long SCHEDULE_QUIET_PERIOD_INTERVAL;
48
49 static {
50 int quietPeriod = SystemPropertyUtil.getInt("io.netty.globalEventExecutor.quietPeriodSeconds", 1);
51 if (quietPeriod <= 0) {
52 quietPeriod = 1;
53 }
54 logger.debug("-Dio.netty.globalEventExecutor.quietPeriodSeconds: {}", quietPeriod);
55
56 SCHEDULE_QUIET_PERIOD_INTERVAL = TimeUnit.SECONDS.toNanos(quietPeriod);
57 }
58
59 public static final GlobalEventExecutor INSTANCE = new GlobalEventExecutor();
60
61 final BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<Runnable>();
62 final ScheduledFutureTask<Void> quietPeriodTask = new ScheduledFutureTask<Void>(
63 this, Executors.<Void>callable(new Runnable() {
64 @Override
65 public void run() {
66
67 }
68 }, null),
69
70
71 deadlineNanos(getCurrentTimeNanos(), SCHEDULE_QUIET_PERIOD_INTERVAL),
72 -SCHEDULE_QUIET_PERIOD_INTERVAL
73 );
74
75
76
77
78
79 final ThreadFactory threadFactory;
80 private final TaskRunner taskRunner = new TaskRunner();
81 private final AtomicBoolean started = new AtomicBoolean();
82 volatile Thread thread;
83
84 private final Future<?> terminationFuture;
85
86 private GlobalEventExecutor() {
87 scheduledTaskQueue().add(quietPeriodTask);
88 threadFactory = ThreadExecutorMap.apply(new DefaultThreadFactory(
89 DefaultThreadFactory.toPoolName(getClass()), false, Thread.NORM_PRIORITY, null), this);
90
91 UnsupportedOperationException terminationFailure = new UnsupportedOperationException();
92 ThrowableUtil.unknownStackTrace(terminationFailure, GlobalEventExecutor.class, "terminationFuture");
93 terminationFuture = new FailedFuture<Object>(this, terminationFailure);
94 }
95
96
97
98
99
100
101 Runnable takeTask() {
102 BlockingQueue<Runnable> taskQueue = this.taskQueue;
103 for (;;) {
104 ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
105 if (scheduledTask == null) {
106 Runnable task = null;
107 try {
108 task = taskQueue.take();
109 } catch (InterruptedException e) {
110
111 }
112 return task;
113 } else {
114 long delayNanos = scheduledTask.delayNanos();
115 Runnable task = null;
116 if (delayNanos > 0) {
117 try {
118 task = taskQueue.poll(delayNanos, TimeUnit.NANOSECONDS);
119 } catch (InterruptedException e) {
120
121 return null;
122 }
123 }
124 if (task == null) {
125
126
127
128
129 fetchFromScheduledTaskQueue();
130 task = taskQueue.poll();
131 }
132
133 if (task != null) {
134 return task;
135 }
136 }
137 }
138 }
139
140 private void fetchFromScheduledTaskQueue() {
141 long nanoTime = getCurrentTimeNanos();
142 Runnable scheduledTask = pollScheduledTask(nanoTime);
143 while (scheduledTask != null) {
144 taskQueue.add(scheduledTask);
145 scheduledTask = pollScheduledTask(nanoTime);
146 }
147 }
148
149
150
151
152 public int pendingTasks() {
153 return taskQueue.size();
154 }
155
156
157
158
159
160 private void addTask(Runnable task) {
161 taskQueue.add(ObjectUtil.checkNotNull(task, "task"));
162 }
163
164 @Override
165 public boolean inEventLoop(Thread thread) {
166 return thread == this.thread;
167 }
168
169 @Override
170 public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
171 return terminationFuture();
172 }
173
174 @Override
175 public Future<?> terminationFuture() {
176 return terminationFuture;
177 }
178
179 @Override
180 @Deprecated
181 public void shutdown() {
182 throw new UnsupportedOperationException();
183 }
184
185 @Override
186 public boolean isShuttingDown() {
187 return false;
188 }
189
190 @Override
191 public boolean isShutdown() {
192 return false;
193 }
194
195 @Override
196 public boolean isTerminated() {
197 return false;
198 }
199
200 @Override
201 public boolean awaitTermination(long timeout, TimeUnit unit) {
202 return false;
203 }
204
205
206
207
208
209
210
211
212
213 public boolean awaitInactivity(long timeout, TimeUnit unit) throws InterruptedException {
214 ObjectUtil.checkNotNull(unit, "unit");
215
216 final Thread thread = this.thread;
217 if (thread == null) {
218 throw new IllegalStateException("thread was not started");
219 }
220 thread.join(unit.toMillis(timeout));
221 return !thread.isAlive();
222 }
223
224 @Override
225 public void execute(Runnable task) {
226 execute0(task);
227 }
228
229 private void execute0(@Schedule Runnable task) {
230 addTask(ObjectUtil.checkNotNull(task, "task"));
231 if (!inEventLoop()) {
232 startThread();
233 }
234 }
235
236 private void startThread() {
237 if (started.compareAndSet(false, true)) {
238 final Thread callingThread = Thread.currentThread();
239 ClassLoader parentCCL = AccessController.doPrivileged(new PrivilegedAction<ClassLoader>() {
240 @Override
241 public ClassLoader run() {
242 return callingThread.getContextClassLoader();
243 }
244 });
245
246 setContextClassLoader(callingThread, null);
247 try {
248 final Thread t = threadFactory.newThread(taskRunner);
249
250
251
252
253
254 setContextClassLoader(t, null);
255
256
257
258
259 thread = t;
260 t.start();
261 } finally {
262 setContextClassLoader(callingThread, parentCCL);
263 }
264 }
265 }
266
267 private static void setContextClassLoader(final Thread t, final ClassLoader cl) {
268 AccessController.doPrivileged(new PrivilegedAction<Void>() {
269 @Override
270 public Void run() {
271 t.setContextClassLoader(cl);
272 return null;
273 }
274 });
275 }
276
277 final class TaskRunner implements Runnable {
278 @Override
279 public void run() {
280 for (;;) {
281 Runnable task = takeTask();
282 if (task != null) {
283 try {
284 runTask(task);
285 } catch (Throwable t) {
286 logger.warn("Unexpected exception from the global event executor: ", t);
287 }
288
289 if (task != quietPeriodTask) {
290 continue;
291 }
292 }
293
294 Queue<ScheduledFutureTask<?>> scheduledTaskQueue = GlobalEventExecutor.this.scheduledTaskQueue;
295
296 if (taskQueue.isEmpty() && (scheduledTaskQueue == null || scheduledTaskQueue.size() == 1)) {
297
298
299
300 boolean stopped = started.compareAndSet(true, false);
301 assert stopped;
302
303
304
305
306 if (taskQueue.isEmpty()) {
307
308
309
310
311 break;
312 }
313
314
315 if (!started.compareAndSet(false, true)) {
316
317
318 break;
319 }
320
321
322
323
324 }
325 }
326 }
327 }
328 }