View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  
17  package io.netty.util;
18  
19  import io.netty.util.concurrent.DefaultThreadFactory;
20  import io.netty.util.internal.ObjectUtil;
21  import io.netty.util.internal.StringUtil;
22  import io.netty.util.internal.SystemPropertyUtil;
23  import io.netty.util.internal.logging.InternalLogger;
24  import io.netty.util.internal.logging.InternalLoggerFactory;
25  
26  import java.security.AccessController;
27  import java.security.PrivilegedAction;
28  import java.util.ArrayList;
29  import java.util.List;
30  import java.util.Queue;
31  import java.util.concurrent.ConcurrentLinkedQueue;
32  import java.util.concurrent.ThreadFactory;
33  import java.util.concurrent.TimeUnit;
34  import java.util.concurrent.atomic.AtomicBoolean;
35  
36  /**
37   * Checks if a thread is alive periodically and runs a task when a thread dies.
38   * <p>
39   * This thread starts a daemon thread to check the state of the threads being watched and to invoke their
40   * associated {@link Runnable}s.  When there is no thread to watch (i.e. all threads are dead), the daemon thread
41   * will terminate itself, and a new daemon thread will be started again when a new watch is added.
42   * </p>
43   *
44   * @deprecated will be removed in the next major release
45   */
46  @Deprecated
47  public final class ThreadDeathWatcher {
48  
49      private static final InternalLogger logger = InternalLoggerFactory.getInstance(ThreadDeathWatcher.class);
50      // visible for testing
51      static final ThreadFactory threadFactory;
52  
53      // Use a MPMC queue as we may end up checking isEmpty() from multiple threads which may not be allowed to do
54      // concurrently depending on the implementation of it in a MPSC queue.
55      private static final Queue<Entry> pendingEntries = new ConcurrentLinkedQueue<Entry>();
56      private static final Watcher watcher = new Watcher();
57      private static final AtomicBoolean started = new AtomicBoolean();
58      private static volatile Thread watcherThread;
59  
60      static {
61          String poolName = "threadDeathWatcher";
62          String serviceThreadPrefix = SystemPropertyUtil.get("io.netty.serviceThreadPrefix");
63          if (!StringUtil.isNullOrEmpty(serviceThreadPrefix)) {
64              poolName = serviceThreadPrefix + poolName;
65          }
66          // because the ThreadDeathWatcher is a singleton, tasks submitted to it can come from arbitrary threads and
67          // this can trigger the creation of a thread from arbitrary thread groups; for this reason, the thread factory
68          // must not be sticky about its thread group
69          threadFactory = new DefaultThreadFactory(poolName, true, Thread.MIN_PRIORITY, null);
70      }
71  
72      /**
73       * Schedules the specified {@code task} to run when the specified {@code thread} dies.
74       *
75       * @param thread the {@link Thread} to watch
76       * @param task the {@link Runnable} to run when the {@code thread} dies
77       *
78       * @throws IllegalArgumentException if the specified {@code thread} is not alive
79       */
80      public static void watch(Thread thread, Runnable task) {
81          ObjectUtil.checkNotNull(thread, "thread");
82          ObjectUtil.checkNotNull(task, "task");
83  
84          if (!thread.isAlive()) {
85              throw new IllegalArgumentException("thread must be alive.");
86          }
87  
88          schedule(thread, task, true);
89      }
90  
91      /**
92       * Cancels the task scheduled via {@link #watch(Thread, Runnable)}.
93       */
94      public static void unwatch(Thread thread, Runnable task) {
95          schedule(ObjectUtil.checkNotNull(thread, "thread"),
96                  ObjectUtil.checkNotNull(task, "task"),
97                  false);
98      }
99  
100     private static void schedule(Thread thread, Runnable task, boolean isWatch) {
101         pendingEntries.add(new Entry(thread, task, isWatch));
102 
103         if (started.compareAndSet(false, true)) {
104             final Thread watcherThread = threadFactory.newThread(watcher);
105             // Set to null to ensure we not create classloader leaks by holds a strong reference to the inherited
106             // classloader.
107             // See:
108             // - https://github.com/netty/netty/issues/7290
109             // - https://bugs.openjdk.java.net/browse/JDK-7008595
110             AccessController.doPrivileged(new PrivilegedAction<Void>() {
111                 @Override
112                 public Void run() {
113                     watcherThread.setContextClassLoader(null);
114                     return null;
115                 }
116             });
117 
118             watcherThread.start();
119             ThreadDeathWatcher.watcherThread = watcherThread;
120         }
121     }
122 
123     /**
124      * Waits until the thread of this watcher has no threads to watch and terminates itself.
125      * Because a new watcher thread will be started again on {@link #watch(Thread, Runnable)},
126      * this operation is only useful when you want to ensure that the watcher thread is terminated
127      * <strong>after</strong> your application is shut down and there's no chance of calling
128      * {@link #watch(Thread, Runnable)} afterwards.
129      *
130      * @return {@code true} if and only if the watcher thread has been terminated
131      */
132     public static boolean awaitInactivity(long timeout, TimeUnit unit) throws InterruptedException {
133         ObjectUtil.checkNotNull(unit, "unit");
134 
135         Thread watcherThread = ThreadDeathWatcher.watcherThread;
136         if (watcherThread != null) {
137             watcherThread.join(unit.toMillis(timeout));
138             return !watcherThread.isAlive();
139         } else {
140             return true;
141         }
142     }
143 
144     private ThreadDeathWatcher() { }
145 
146     private static final class Watcher implements Runnable {
147 
148         private final List<Entry> watchees = new ArrayList<Entry>();
149 
150         @Override
151         public void run() {
152             for (;;) {
153                 fetchWatchees();
154                 notifyWatchees();
155 
156                 // Try once again just in case notifyWatchees() triggered watch() or unwatch().
157                 fetchWatchees();
158                 notifyWatchees();
159 
160                 try {
161                     Thread.sleep(1000);
162                 } catch (InterruptedException ignore) {
163                     // Ignore the interrupt; do not terminate until all tasks are run.
164                 }
165 
166                 if (watchees.isEmpty() && pendingEntries.isEmpty()) {
167 
168                     // Mark the current worker thread as stopped.
169                     // The following CAS must always success and must be uncontended,
170                     // because only one watcher thread should be running at the same time.
171                     boolean stopped = started.compareAndSet(true, false);
172                     assert stopped;
173 
174                     // Check if there are pending entries added by watch() while we do CAS above.
175                     if (pendingEntries.isEmpty()) {
176                         // A) watch() was not invoked and thus there's nothing to handle
177                         //    -> safe to terminate because there's nothing left to do
178                         // B) a new watcher thread started and handled them all
179                         //    -> safe to terminate the new watcher thread will take care the rest
180                         break;
181                     }
182 
183                     // There are pending entries again, added by watch()
184                     if (!started.compareAndSet(false, true)) {
185                         // watch() started a new watcher thread and set 'started' to true.
186                         // -> terminate this thread so that the new watcher reads from pendingEntries exclusively.
187                         break;
188                     }
189 
190                     // watch() added an entry, but this worker was faster to set 'started' to true.
191                     // i.e. a new watcher thread was not started
192                     // -> keep this thread alive to handle the newly added entries.
193                 }
194             }
195         }
196 
197         private void fetchWatchees() {
198             for (;;) {
199                 Entry e = pendingEntries.poll();
200                 if (e == null) {
201                     break;
202                 }
203 
204                 if (e.isWatch) {
205                     watchees.add(e);
206                 } else {
207                     watchees.remove(e);
208                 }
209             }
210         }
211 
212         private void notifyWatchees() {
213             List<Entry> watchees = this.watchees;
214             for (int i = 0; i < watchees.size();) {
215                 Entry e = watchees.get(i);
216                 if (!e.thread.isAlive()) {
217                     watchees.remove(i);
218                     try {
219                         e.task.run();
220                     } catch (Throwable t) {
221                         logger.warn("Thread death watcher task raised an exception:", t);
222                     }
223                 } else {
224                     i ++;
225                 }
226             }
227         }
228     }
229 
230     private static final class Entry {
231         final Thread thread;
232         final Runnable task;
233         final boolean isWatch;
234 
235         Entry(Thread thread, Runnable task, boolean isWatch) {
236             this.thread = thread;
237             this.task = task;
238             this.isWatch = isWatch;
239         }
240 
241         @Override
242         public int hashCode() {
243             return thread.hashCode() ^ task.hashCode();
244         }
245 
246         @Override
247         public boolean equals(Object obj) {
248             if (obj == this) {
249                 return true;
250             }
251 
252             if (!(obj instanceof Entry)) {
253                 return false;
254             }
255 
256             Entry that = (Entry) obj;
257             return thread == that.thread && task == that.task;
258         }
259     }
260 }