1 /*
2 * Copyright 2014 The Netty Project
3 *
4 * The Netty Project licenses this file to you under the Apache License,
5 * version 2.0 (the "License"); you may not use this file except in compliance
6 * with the License. You may obtain a copy of the License at:
7 *
8 * https://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations
14 * under the License.
15 */
16
17 package io.netty.util;
18
19 import io.netty.util.concurrent.DefaultThreadFactory;
20 import io.netty.util.internal.ObjectUtil;
21 import io.netty.util.internal.StringUtil;
22 import io.netty.util.internal.SystemPropertyUtil;
23 import io.netty.util.internal.logging.InternalLogger;
24 import io.netty.util.internal.logging.InternalLoggerFactory;
25
26 import java.security.AccessController;
27 import java.security.PrivilegedAction;
28 import java.util.ArrayList;
29 import java.util.List;
30 import java.util.Queue;
31 import java.util.concurrent.ConcurrentLinkedQueue;
32 import java.util.concurrent.ThreadFactory;
33 import java.util.concurrent.TimeUnit;
34 import java.util.concurrent.atomic.AtomicBoolean;
35
36 /**
37 * Checks if a thread is alive periodically and runs a task when a thread dies.
38 * <p>
39 * This thread starts a daemon thread to check the state of the threads being watched and to invoke their
40 * associated {@link Runnable}s. When there is no thread to watch (i.e. all threads are dead), the daemon thread
41 * will terminate itself, and a new daemon thread will be started again when a new watch is added.
42 * </p>
43 *
44 * @deprecated will be removed in the next major release
45 */
46 @Deprecated
47 public final class ThreadDeathWatcher {
48
49 private static final InternalLogger logger = InternalLoggerFactory.getInstance(ThreadDeathWatcher.class);
50 // visible for testing
51 static final ThreadFactory threadFactory;
52
53 // Use a MPMC queue as we may end up checking isEmpty() from multiple threads which may not be allowed to do
54 // concurrently depending on the implementation of it in a MPSC queue.
55 private static final Queue<Entry> pendingEntries = new ConcurrentLinkedQueue<Entry>();
56 private static final Watcher watcher = new Watcher();
57 private static final AtomicBoolean started = new AtomicBoolean();
58 private static volatile Thread watcherThread;
59
60 static {
61 String poolName = "threadDeathWatcher";
62 String serviceThreadPrefix = SystemPropertyUtil.get("io.netty.serviceThreadPrefix");
63 if (!StringUtil.isNullOrEmpty(serviceThreadPrefix)) {
64 poolName = serviceThreadPrefix + poolName;
65 }
66 // because the ThreadDeathWatcher is a singleton, tasks submitted to it can come from arbitrary threads and
67 // this can trigger the creation of a thread from arbitrary thread groups; for this reason, the thread factory
68 // must not be sticky about its thread group
69 threadFactory = new DefaultThreadFactory(poolName, true, Thread.MIN_PRIORITY, null);
70 }
71
72 /**
73 * Schedules the specified {@code task} to run when the specified {@code thread} dies.
74 *
75 * @param thread the {@link Thread} to watch
76 * @param task the {@link Runnable} to run when the {@code thread} dies
77 *
78 * @throws IllegalArgumentException if the specified {@code thread} is not alive
79 */
80 public static void watch(Thread thread, Runnable task) {
81 ObjectUtil.checkNotNull(thread, "thread");
82 ObjectUtil.checkNotNull(task, "task");
83
84 if (!thread.isAlive()) {
85 throw new IllegalArgumentException("thread must be alive.");
86 }
87
88 schedule(thread, task, true);
89 }
90
91 /**
92 * Cancels the task scheduled via {@link #watch(Thread, Runnable)}.
93 */
94 public static void unwatch(Thread thread, Runnable task) {
95 schedule(ObjectUtil.checkNotNull(thread, "thread"),
96 ObjectUtil.checkNotNull(task, "task"),
97 false);
98 }
99
100 private static void schedule(Thread thread, Runnable task, boolean isWatch) {
101 pendingEntries.add(new Entry(thread, task, isWatch));
102
103 if (started.compareAndSet(false, true)) {
104 final Thread watcherThread = threadFactory.newThread(watcher);
105 // Set to null to ensure we not create classloader leaks by holds a strong reference to the inherited
106 // classloader.
107 // See:
108 // - https://github.com/netty/netty/issues/7290
109 // - https://bugs.openjdk.java.net/browse/JDK-7008595
110 AccessController.doPrivileged(new PrivilegedAction<Void>() {
111 @Override
112 public Void run() {
113 watcherThread.setContextClassLoader(null);
114 return null;
115 }
116 });
117
118 watcherThread.start();
119 ThreadDeathWatcher.watcherThread = watcherThread;
120 }
121 }
122
123 /**
124 * Waits until the thread of this watcher has no threads to watch and terminates itself.
125 * Because a new watcher thread will be started again on {@link #watch(Thread, Runnable)},
126 * this operation is only useful when you want to ensure that the watcher thread is terminated
127 * <strong>after</strong> your application is shut down and there's no chance of calling
128 * {@link #watch(Thread, Runnable)} afterwards.
129 *
130 * @return {@code true} if and only if the watcher thread has been terminated
131 */
132 public static boolean awaitInactivity(long timeout, TimeUnit unit) throws InterruptedException {
133 ObjectUtil.checkNotNull(unit, "unit");
134
135 Thread watcherThread = ThreadDeathWatcher.watcherThread;
136 if (watcherThread != null) {
137 watcherThread.join(unit.toMillis(timeout));
138 return !watcherThread.isAlive();
139 } else {
140 return true;
141 }
142 }
143
144 private ThreadDeathWatcher() { }
145
146 private static final class Watcher implements Runnable {
147
148 private final List<Entry> watchees = new ArrayList<Entry>();
149
150 @Override
151 public void run() {
152 for (;;) {
153 fetchWatchees();
154 notifyWatchees();
155
156 // Try once again just in case notifyWatchees() triggered watch() or unwatch().
157 fetchWatchees();
158 notifyWatchees();
159
160 try {
161 Thread.sleep(1000);
162 } catch (InterruptedException ignore) {
163 // Ignore the interrupt; do not terminate until all tasks are run.
164 }
165
166 if (watchees.isEmpty() && pendingEntries.isEmpty()) {
167
168 // Mark the current worker thread as stopped.
169 // The following CAS must always success and must be uncontended,
170 // because only one watcher thread should be running at the same time.
171 boolean stopped = started.compareAndSet(true, false);
172 assert stopped;
173
174 // Check if there are pending entries added by watch() while we do CAS above.
175 if (pendingEntries.isEmpty()) {
176 // A) watch() was not invoked and thus there's nothing to handle
177 // -> safe to terminate because there's nothing left to do
178 // B) a new watcher thread started and handled them all
179 // -> safe to terminate the new watcher thread will take care the rest
180 break;
181 }
182
183 // There are pending entries again, added by watch()
184 if (!started.compareAndSet(false, true)) {
185 // watch() started a new watcher thread and set 'started' to true.
186 // -> terminate this thread so that the new watcher reads from pendingEntries exclusively.
187 break;
188 }
189
190 // watch() added an entry, but this worker was faster to set 'started' to true.
191 // i.e. a new watcher thread was not started
192 // -> keep this thread alive to handle the newly added entries.
193 }
194 }
195 }
196
197 private void fetchWatchees() {
198 for (;;) {
199 Entry e = pendingEntries.poll();
200 if (e == null) {
201 break;
202 }
203
204 if (e.isWatch) {
205 watchees.add(e);
206 } else {
207 watchees.remove(e);
208 }
209 }
210 }
211
212 private void notifyWatchees() {
213 List<Entry> watchees = this.watchees;
214 for (int i = 0; i < watchees.size();) {
215 Entry e = watchees.get(i);
216 if (!e.thread.isAlive()) {
217 watchees.remove(i);
218 try {
219 e.task.run();
220 } catch (Throwable t) {
221 logger.warn("Thread death watcher task raised an exception:", t);
222 }
223 } else {
224 i ++;
225 }
226 }
227 }
228 }
229
230 private static final class Entry {
231 final Thread thread;
232 final Runnable task;
233 final boolean isWatch;
234
235 Entry(Thread thread, Runnable task, boolean isWatch) {
236 this.thread = thread;
237 this.task = task;
238 this.isWatch = isWatch;
239 }
240
241 @Override
242 public int hashCode() {
243 return thread.hashCode() ^ task.hashCode();
244 }
245
246 @Override
247 public boolean equals(Object obj) {
248 if (obj == this) {
249 return true;
250 }
251
252 if (!(obj instanceof Entry)) {
253 return false;
254 }
255
256 Entry that = (Entry) obj;
257 return thread == that.thread && task == that.task;
258 }
259 }
260 }