1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.util.concurrent;
17
18 import io.netty.util.internal.ObjectUtil;
19 import io.netty.util.internal.SystemPropertyUtil;
20 import io.netty.util.internal.ThreadExecutorMap;
21 import io.netty.util.internal.ThrowableUtil;
22 import io.netty.util.internal.logging.InternalLogger;
23 import io.netty.util.internal.logging.InternalLoggerFactory;
24
25 import org.jetbrains.annotations.Async.Schedule;
26
27 import java.security.AccessController;
28 import java.security.PrivilegedAction;
29 import java.util.Queue;
30 import java.util.concurrent.BlockingQueue;
31 import java.util.concurrent.Executors;
32 import java.util.concurrent.LinkedBlockingQueue;
33 import java.util.concurrent.RejectedExecutionException;
34 import java.util.concurrent.ThreadFactory;
35 import java.util.concurrent.TimeUnit;
36 import java.util.concurrent.atomic.AtomicBoolean;
37
38
39
40
41
42
43
44 public final class GlobalEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
45 private static final InternalLogger logger = InternalLoggerFactory.getInstance(GlobalEventExecutor.class);
46
47 private static final long SCHEDULE_QUIET_PERIOD_INTERVAL;
48
49 static {
50 int quietPeriod = SystemPropertyUtil.getInt("io.netty.globalEventExecutor.quietPeriodSeconds", 1);
51 if (quietPeriod <= 0) {
52 quietPeriod = 1;
53 }
54 logger.debug("-Dio.netty.globalEventExecutor.quietPeriodSeconds: {}", quietPeriod);
55
56 SCHEDULE_QUIET_PERIOD_INTERVAL = TimeUnit.SECONDS.toNanos(quietPeriod);
57 }
58
59 public static final GlobalEventExecutor INSTANCE = new GlobalEventExecutor();
60
61 final BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<Runnable>();
62 final ScheduledFutureTask<Void> quietPeriodTask = new ScheduledFutureTask<Void>(
63 this, Executors.<Void>callable(new Runnable() {
64 @Override
65 public void run() {
66
67 }
68 }, null),
69
70
71 deadlineNanos(getCurrentTimeNanos(), SCHEDULE_QUIET_PERIOD_INTERVAL),
72 -SCHEDULE_QUIET_PERIOD_INTERVAL
73 );
74
75
76
77
78
79 final ThreadFactory threadFactory;
80 private final TaskRunner taskRunner = new TaskRunner();
81 private final AtomicBoolean started = new AtomicBoolean();
82 volatile Thread thread;
83
84 private final Future<?> terminationFuture;
85
86 private GlobalEventExecutor() {
87 scheduleFromEventLoop(quietPeriodTask);
88 threadFactory = ThreadExecutorMap.apply(new DefaultThreadFactory(
89 DefaultThreadFactory.toPoolName(getClass()), false, Thread.NORM_PRIORITY, null), this);
90
91 UnsupportedOperationException terminationFailure = new UnsupportedOperationException();
92 ThrowableUtil.unknownStackTrace(terminationFailure, GlobalEventExecutor.class, "terminationFuture");
93 terminationFuture = new FailedFuture<Object>(this, terminationFailure);
94 }
95
96
97
98
99
100
101 Runnable takeTask() {
102 BlockingQueue<Runnable> taskQueue = this.taskQueue;
103 for (;;) {
104 ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
105 if (scheduledTask == null) {
106 Runnable task = null;
107 try {
108 task = taskQueue.take();
109 } catch (InterruptedException e) {
110
111 }
112 return task;
113 } else {
114 long delayNanos = scheduledTask.delayNanos();
115 Runnable task = null;
116 if (delayNanos > 0) {
117 try {
118 task = taskQueue.poll(delayNanos, TimeUnit.NANOSECONDS);
119 } catch (InterruptedException e) {
120
121 return null;
122 }
123 }
124 if (task == null) {
125
126
127
128
129 fetchFromScheduledTaskQueue();
130 task = taskQueue.poll();
131 }
132
133 if (task != null) {
134 return task;
135 }
136 }
137 }
138 }
139
140 private void fetchFromScheduledTaskQueue() {
141 long nanoTime = getCurrentTimeNanos();
142 ScheduledFutureTask scheduledTask;
143 while ((scheduledTask = (ScheduledFutureTask) pollScheduledTask(nanoTime)) != null) {
144 if (scheduledTask.isCancelled()) {
145 continue;
146 }
147 taskQueue.add(scheduledTask);
148 }
149 }
150
151
152
153
154 public int pendingTasks() {
155 return taskQueue.size();
156 }
157
158
159
160
161
162 private void addTask(Runnable task) {
163 taskQueue.add(ObjectUtil.checkNotNull(task, "task"));
164 }
165
166 @Override
167 public boolean inEventLoop(Thread thread) {
168 return thread == this.thread;
169 }
170
171 @Override
172 public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
173 return terminationFuture();
174 }
175
176 @Override
177 public Future<?> terminationFuture() {
178 return terminationFuture;
179 }
180
181 @Override
182 @Deprecated
183 public void shutdown() {
184 throw new UnsupportedOperationException();
185 }
186
187 @Override
188 public boolean isShuttingDown() {
189 return false;
190 }
191
192 @Override
193 public boolean isShutdown() {
194 return false;
195 }
196
197 @Override
198 public boolean isTerminated() {
199 return false;
200 }
201
202 @Override
203 public boolean awaitTermination(long timeout, TimeUnit unit) {
204 return false;
205 }
206
207
208
209
210
211
212
213
214
215 public boolean awaitInactivity(long timeout, TimeUnit unit) throws InterruptedException {
216 ObjectUtil.checkNotNull(unit, "unit");
217
218 final Thread thread = this.thread;
219 if (thread == null) {
220 throw new IllegalStateException("thread was not started");
221 }
222 thread.join(unit.toMillis(timeout));
223 return !thread.isAlive();
224 }
225
226 @Override
227 public void execute(Runnable task) {
228 execute0(task);
229 }
230
231 private void execute0(@Schedule Runnable task) {
232 addTask(ObjectUtil.checkNotNull(task, "task"));
233 if (!inEventLoop()) {
234 startThread();
235 }
236 }
237
238 private void startThread() {
239 if (started.compareAndSet(false, true)) {
240 final Thread callingThread = Thread.currentThread();
241 ClassLoader parentCCL = AccessController.doPrivileged(new PrivilegedAction<ClassLoader>() {
242 @Override
243 public ClassLoader run() {
244 return callingThread.getContextClassLoader();
245 }
246 });
247
248 setContextClassLoader(callingThread, null);
249 try {
250 final Thread t = threadFactory.newThread(taskRunner);
251
252
253
254
255
256 setContextClassLoader(t, null);
257
258
259
260
261 thread = t;
262 t.start();
263 } finally {
264 setContextClassLoader(callingThread, parentCCL);
265 }
266 }
267 }
268
269 private static void setContextClassLoader(final Thread t, final ClassLoader cl) {
270 AccessController.doPrivileged(new PrivilegedAction<Void>() {
271 @Override
272 public Void run() {
273 t.setContextClassLoader(cl);
274 return null;
275 }
276 });
277 }
278
279 final class TaskRunner implements Runnable {
280 @Override
281 public void run() {
282 for (;;) {
283 Runnable task = takeTask();
284 if (task != null) {
285 try {
286 runTask(task);
287 } catch (Throwable t) {
288 logger.warn("Unexpected exception from the global event executor: ", t);
289 }
290
291 if (task != quietPeriodTask) {
292 continue;
293 }
294 }
295
296 Queue<ScheduledFutureTask<?>> scheduledTaskQueue = GlobalEventExecutor.this.scheduledTaskQueue;
297
298 if (taskQueue.isEmpty() && (scheduledTaskQueue == null || scheduledTaskQueue.size() == 1)) {
299
300
301
302 boolean stopped = started.compareAndSet(true, false);
303 assert stopped;
304
305
306
307
308 if (taskQueue.isEmpty()) {
309
310
311
312
313 break;
314 }
315
316
317 if (!started.compareAndSet(false, true)) {
318
319
320 break;
321 }
322
323
324
325
326 }
327 }
328 }
329 }
330 }