1 /*
2 * Copyright 2012 The Netty Project
3 *
4 * The Netty Project licenses this file to you under the Apache License,
5 * version 2.0 (the "License"); you may not use this file except in compliance
6 * with the License. You may obtain a copy of the License at:
7 *
8 * https://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations
14 * under the License.
15 */
16 package io.netty5.util.concurrent;
17
18 import io.netty5.util.internal.ThreadExecutorMap;
19 import io.netty5.util.internal.logging.InternalLogger;
20 import io.netty5.util.internal.logging.InternalLoggerFactory;
21 import org.jetbrains.annotations.Async.Execute;
22 import org.jetbrains.annotations.Async.Schedule;
23
24 import java.security.AccessController;
25 import java.security.PrivilegedAction;
26 import java.util.Queue;
27 import java.util.concurrent.BlockingQueue;
28 import java.util.concurrent.Executors;
29 import java.util.concurrent.LinkedBlockingQueue;
30 import java.util.concurrent.RejectedExecutionException;
31 import java.util.concurrent.ThreadFactory;
32 import java.util.concurrent.TimeUnit;
33 import java.util.concurrent.atomic.AtomicBoolean;
34
35 import static java.util.Objects.requireNonNull;
36
37 /**
38 * Single-thread singleton {@link EventExecutor}. It starts the thread automatically and stops it when there is no
39 * task pending in the task queue for 1 second. Please note it is not scalable to schedule large number of tasks to
40 * this executor; use a dedicated executor.
41 */
42 public final class GlobalEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
43
44 private static final InternalLogger logger = InternalLoggerFactory.getInstance(GlobalEventExecutor.class);
45
46 private static final long SCHEDULE_QUIET_PERIOD_INTERVAL = TimeUnit.SECONDS.toNanos(1);
47
48 private final RunnableScheduledFutureAdapter<Void> quietPeriodTask;
49 public static final GlobalEventExecutor INSTANCE;
50
51 static {
52 INSTANCE = new GlobalEventExecutor();
53 }
54
55 private final BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>();
56
57 // because the GlobalEventExecutor is a singleton, tasks submitted to it can come from arbitrary threads and this
58 // can trigger the creation of a thread from arbitrary thread groups; for this reason, the thread factory must not
59 // be sticky about its thread group
60 // visible for testing
61 final ThreadFactory threadFactory;
62 private final TaskRunner taskRunner = new TaskRunner();
63 private final AtomicBoolean started = new AtomicBoolean();
64 volatile Thread thread;
65
66 private final Future<Void> terminationFuture = DefaultPromise.<Void>newFailedPromise(
67 this, new UnsupportedOperationException()).asFuture();
68
69 private GlobalEventExecutor() {
70 threadFactory = ThreadExecutorMap.apply(new DefaultThreadFactory(
71 DefaultThreadFactory.toPoolName(getClass()), false, Thread.NORM_PRIORITY, null), this);
72 quietPeriodTask = new RunnableScheduledFutureAdapter<>(
73 this, newPromise(), Executors.callable(() -> {
74 // NOOP
75 }, null),
76 // note: the getCurrentTimeNanos() call here only works because this is a final class, otherwise
77 // the method could be overridden leading to unsafe initialization here!
78 deadlineNanos(getCurrentTimeNanos(), SCHEDULE_QUIET_PERIOD_INTERVAL),
79 -SCHEDULE_QUIET_PERIOD_INTERVAL);
80
81 scheduledTaskQueue().add(quietPeriodTask);
82 }
83
84 /**
85 * Take the next {@link Runnable} from the task queue and so will block if no task is currently present.
86 *
87 * @return {@code null} if the executor thread has been interrupted or waken up.
88 */
89 private Runnable takeTask() {
90 BlockingQueue<Runnable> taskQueue = this.taskQueue;
91 for (;;) {
92 RunnableScheduledFuture<?> scheduledTask = peekScheduledTask();
93 if (scheduledTask == null) {
94 Runnable task = null;
95 try {
96 task = taskQueue.take();
97 } catch (InterruptedException e) {
98 // Ignore
99 }
100 return task;
101 } else {
102 long delayNanos = scheduledTask.delayNanos();
103 Runnable task = null;
104 if (delayNanos > 0) {
105 try {
106 task = taskQueue.poll(delayNanos, TimeUnit.NANOSECONDS);
107 } catch (InterruptedException e) {
108 // Waken up.
109 return null;
110 }
111 }
112 if (task == null) {
113 // We need to fetch the scheduled tasks now as otherwise there may be a chance that
114 // scheduled tasks are never executed if there is always one task in the taskQueue.
115 // This is for example true for the read task of OIO Transport
116 // See https://github.com/netty/netty/issues/1614
117 fetchFromScheduledTaskQueue();
118 task = taskQueue.poll();
119 }
120
121 if (task != null) {
122 return task;
123 }
124 }
125 }
126 }
127
128 private void fetchFromScheduledTaskQueue() {
129 long nanoTime = getCurrentTimeNanos();
130 Runnable scheduledTask = pollScheduledTask(nanoTime);
131 while (scheduledTask != null) {
132 taskQueue.add(scheduledTask);
133 scheduledTask = pollScheduledTask(nanoTime);
134 }
135 }
136
137 /**
138 * Return the number of tasks that are pending for processing.
139 */
140 public int pendingTasks() {
141 return taskQueue.size();
142 }
143
144 /**
145 * Add a task to the task queue, or throws a {@link RejectedExecutionException} if this instance was shutdown
146 * before.
147 */
148 private void addTask(Runnable task) {
149 requireNonNull(task, "task");
150 taskQueue.add(task);
151 }
152
153 @Override
154 public boolean inEventLoop(Thread thread) {
155 return thread == this.thread;
156 }
157
158 @Override
159 public Future<Void> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
160 return terminationFuture();
161 }
162
163 @Override
164 public Future<Void> terminationFuture() {
165 return terminationFuture;
166 }
167
168 @Override
169 public boolean isShuttingDown() {
170 return false;
171 }
172
173 @Override
174 public boolean isShutdown() {
175 return false;
176 }
177
178 @Override
179 public boolean isTerminated() {
180 return false;
181 }
182
183 @Override
184 public boolean awaitTermination(long timeout, TimeUnit unit) {
185 return false;
186 }
187
188 /**
189 * Waits until the worker thread of this executor has no tasks left in its task queue and terminates itself.
190 * Because a new worker thread will be started again when a new task is submitted, this operation is only useful
191 * when you want to ensure that the worker thread is terminated <strong>after</strong> your application is shut
192 * down and there's no chance of submitting a new task afterwards.
193 *
194 * @return {@code true} if and only if the worker thread has been terminated
195 */
196 public boolean awaitInactivity(long timeout, TimeUnit unit) throws InterruptedException {
197 requireNonNull(unit, "unit");
198
199 final Thread thread = this.thread;
200 if (thread == null) {
201 throw new IllegalStateException("thread was not started");
202 }
203 thread.join(unit.toMillis(timeout));
204 return !thread.isAlive();
205 }
206
207 @Override
208 public void execute(@Schedule Runnable task) {
209 requireNonNull(task, "task");
210
211 addTask(task);
212 if (!inEventLoop()) {
213 startThread();
214 }
215 }
216
217 private void startThread() {
218 if (started.compareAndSet(false, true)) {
219 final Thread t = threadFactory.newThread(taskRunner);
220 // Set to null to ensure we not create classloader leaks by holds a strong reference to the inherited
221 // classloader.
222 // See:
223 // - https://github.com/netty/netty/issues/7290
224 // - https://bugs.openjdk.java.net/browse/JDK-7008595
225 AccessController.doPrivileged((PrivilegedAction<Void>) () -> {
226 t.setContextClassLoader(null);
227 return null;
228 });
229
230 // Set the thread before starting it as otherwise inEventLoop() may return false and so produce
231 // an assert error.
232 // See https://github.com/netty/netty/issues/4357
233 thread = t;
234 t.start();
235 }
236 }
237
238 final class TaskRunner implements Runnable {
239 @Override
240 public void run() {
241 for (;;) {
242 Runnable task = takeTask();
243 if (task != null) {
244 try {
245 runTask(task);
246 } catch (Throwable t) {
247 logger.warn("Unexpected exception from the global event executor: ", t);
248 }
249
250 if (task != quietPeriodTask) {
251 continue;
252 }
253 }
254
255 Queue<RunnableScheduledFutureNode<?>> scheduledTaskQueue = scheduledTaskQueue();
256 // Terminate if there is no task in the queue (except the noop task).
257 if (taskQueue.isEmpty() && scheduledTaskQueue.size() <= 1) {
258 // Mark the current thread as stopped.
259 // The following CAS must always success and must be uncontended,
260 // because only one thread should be running at the same time.
261 boolean stopped = started.compareAndSet(true, false);
262 assert stopped;
263
264 // Do not check scheduledTaskQueue because it is not thread-safe and can only be mutated from a
265 // TaskRunner actively running tasks.
266 if (taskQueue.isEmpty()) {
267 // A) No new task was added and thus there's nothing to handle
268 // -> safe to terminate because there's nothing left to do
269 // B) A new thread started and handled all the new tasks.
270 // -> safe to terminate the new thread will take care the rest
271 break;
272 }
273
274 // There are pending tasks added again.
275 if (!started.compareAndSet(false, true)) {
276 // startThread() started a new thread and set 'started' to true.
277 // -> terminate this thread so that the new thread reads from taskQueue exclusively.
278 break;
279 }
280
281 // New tasks were added, but this worker was faster to set 'started' to true.
282 // i.e. a new worker thread was not started by startThread().
283 // -> keep this thread alive to handle the newly added entries.
284 }
285 }
286 }
287
288 private void runTask(@Execute Runnable task) {
289 task.run();
290 }
291 }
292 }