1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.util.concurrent;
17
18 import io.netty.util.internal.ObjectUtil;
19 import io.netty.util.internal.SystemPropertyUtil;
20 import io.netty.util.internal.ThreadExecutorMap;
21 import io.netty.util.internal.ThrowableUtil;
22 import io.netty.util.internal.logging.InternalLogger;
23 import io.netty.util.internal.logging.InternalLoggerFactory;
24
25 import org.jetbrains.annotations.Async.Schedule;
26
27 import java.security.AccessController;
28 import java.security.PrivilegedAction;
29 import java.util.Queue;
30 import java.util.concurrent.BlockingQueue;
31 import java.util.concurrent.Executors;
32 import java.util.concurrent.LinkedBlockingQueue;
33 import java.util.concurrent.RejectedExecutionException;
34 import java.util.concurrent.ThreadFactory;
35 import java.util.concurrent.TimeUnit;
36 import java.util.concurrent.atomic.AtomicBoolean;
37
38
39
40
41
42
43
44 public final class GlobalEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
45 private static final InternalLogger logger = InternalLoggerFactory.getInstance(GlobalEventExecutor.class);
46
47 private static final long SCHEDULE_QUIET_PERIOD_INTERVAL;
48
49 static {
50 int quietPeriod = SystemPropertyUtil.getInt("io.netty.globalEventExecutor.quietPeriodSeconds", 1);
51 if (quietPeriod <= 0) {
52 quietPeriod = 1;
53 }
54 logger.debug("-Dio.netty.globalEventExecutor.quietPeriodSeconds: {}", quietPeriod);
55
56 SCHEDULE_QUIET_PERIOD_INTERVAL = TimeUnit.SECONDS.toNanos(quietPeriod);
57 }
58
59 public static final GlobalEventExecutor INSTANCE = new GlobalEventExecutor();
60
61 final BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<Runnable>();
62 final ScheduledFutureTask<Void> quietPeriodTask = new ScheduledFutureTask<Void>(
63 this, Executors.<Void>callable(new Runnable() {
64 @Override
65 public void run() {
66
67 }
68 }, null),
69
70
71 deadlineNanos(getCurrentTimeNanos(), SCHEDULE_QUIET_PERIOD_INTERVAL),
72 -SCHEDULE_QUIET_PERIOD_INTERVAL
73 );
74
75
76
77
78
79 final ThreadFactory threadFactory;
80 private final TaskRunner taskRunner = new TaskRunner();
81 private final AtomicBoolean started = new AtomicBoolean();
82 volatile Thread thread;
83
84 private final Future<?> terminationFuture;
85
86 private GlobalEventExecutor() {
87 scheduledTaskQueue().add(quietPeriodTask);
88 threadFactory = ThreadExecutorMap.apply(new DefaultThreadFactory(
89 DefaultThreadFactory.toPoolName(getClass()), false, Thread.NORM_PRIORITY, null), this);
90
91 UnsupportedOperationException terminationFailure = new UnsupportedOperationException();
92 ThrowableUtil.unknownStackTrace(terminationFailure, GlobalEventExecutor.class, "terminationFuture");
93 terminationFuture = new FailedFuture<Object>(this, terminationFailure);
94 }
95
96
97
98
99
100
101 Runnable takeTask() {
102 BlockingQueue<Runnable> taskQueue = this.taskQueue;
103 for (;;) {
104 ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
105 if (scheduledTask == null) {
106 Runnable task = null;
107 try {
108 task = taskQueue.take();
109 } catch (InterruptedException e) {
110
111 }
112 return task;
113 } else {
114 long delayNanos = scheduledTask.delayNanos();
115 Runnable task = null;
116 if (delayNanos > 0) {
117 try {
118 task = taskQueue.poll(delayNanos, TimeUnit.NANOSECONDS);
119 } catch (InterruptedException e) {
120
121 return null;
122 }
123 }
124 if (task == null) {
125
126
127
128
129 fetchFromScheduledTaskQueue();
130 task = taskQueue.poll();
131 }
132
133 if (task != null) {
134 return task;
135 }
136 }
137 }
138 }
139
140 private void fetchFromScheduledTaskQueue() {
141 long nanoTime = getCurrentTimeNanos();
142 Runnable scheduledTask = pollScheduledTask(nanoTime);
143 while (scheduledTask != null) {
144 taskQueue.add(scheduledTask);
145 scheduledTask = pollScheduledTask(nanoTime);
146 }
147 }
148
149
150
151
152 public int pendingTasks() {
153 return taskQueue.size();
154 }
155
156
157
158
159
160 private void addTask(Runnable task) {
161 taskQueue.add(ObjectUtil.checkNotNull(task, "task"));
162 }
163
164 @Override
165 public boolean inEventLoop(Thread thread) {
166 return thread == this.thread;
167 }
168
169 @Override
170 public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
171 return terminationFuture();
172 }
173
174 @Override
175 public Future<?> terminationFuture() {
176 return terminationFuture;
177 }
178
179 @Override
180 @Deprecated
181 public void shutdown() {
182 throw new UnsupportedOperationException();
183 }
184
185 @Override
186 public boolean isShuttingDown() {
187 return false;
188 }
189
190 @Override
191 public boolean isShutdown() {
192 return false;
193 }
194
195 @Override
196 public boolean isTerminated() {
197 return false;
198 }
199
200 @Override
201 public boolean awaitTermination(long timeout, TimeUnit unit) {
202 return false;
203 }
204
205
206
207
208
209
210
211
212
213 public boolean awaitInactivity(long timeout, TimeUnit unit) throws InterruptedException {
214 ObjectUtil.checkNotNull(unit, "unit");
215
216 final Thread thread = this.thread;
217 if (thread == null) {
218 throw new IllegalStateException("thread was not started");
219 }
220 thread.join(unit.toMillis(timeout));
221 return !thread.isAlive();
222 }
223
224 @Override
225 public void execute(Runnable task) {
226 execute0(task);
227 }
228
229 private void execute0(@Schedule Runnable task) {
230 addTask(ObjectUtil.checkNotNull(task, "task"));
231 if (!inEventLoop()) {
232 startThread();
233 }
234 }
235
236 private void startThread() {
237 if (started.compareAndSet(false, true)) {
238 Thread callingThread = Thread.currentThread();
239 ClassLoader parentCCL = callingThread.getContextClassLoader();
240
241 setContextClassLoader(callingThread, null);
242 try {
243 final Thread t = threadFactory.newThread(taskRunner);
244
245
246
247
248
249 setContextClassLoader(t, null);
250
251
252
253
254 thread = t;
255 t.start();
256 } finally {
257 setContextClassLoader(callingThread, parentCCL);
258 }
259 }
260 }
261
262 private static void setContextClassLoader(final Thread t, final ClassLoader cl) {
263 AccessController.doPrivileged(new PrivilegedAction<Void>() {
264 @Override
265 public Void run() {
266 t.setContextClassLoader(cl);
267 return null;
268 }
269 });
270 }
271
272 final class TaskRunner implements Runnable {
273 @Override
274 public void run() {
275 for (;;) {
276 Runnable task = takeTask();
277 if (task != null) {
278 try {
279 runTask(task);
280 } catch (Throwable t) {
281 logger.warn("Unexpected exception from the global event executor: ", t);
282 }
283
284 if (task != quietPeriodTask) {
285 continue;
286 }
287 }
288
289 Queue<ScheduledFutureTask<?>> scheduledTaskQueue = GlobalEventExecutor.this.scheduledTaskQueue;
290
291 if (taskQueue.isEmpty() && (scheduledTaskQueue == null || scheduledTaskQueue.size() == 1)) {
292
293
294
295 boolean stopped = started.compareAndSet(true, false);
296 assert stopped;
297
298
299
300
301 if (taskQueue.isEmpty()) {
302
303
304
305
306 break;
307 }
308
309
310 if (!started.compareAndSet(false, true)) {
311
312
313 break;
314 }
315
316
317
318
319 }
320 }
321 }
322 }
323 }