View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  
17  package io.netty.util;
18  
19  import io.netty.util.concurrent.DefaultThreadFactory;
20  import io.netty.util.internal.StringUtil;
21  import io.netty.util.internal.SystemPropertyUtil;
22  import io.netty.util.internal.logging.InternalLogger;
23  import io.netty.util.internal.logging.InternalLoggerFactory;
24  
25  import java.util.ArrayList;
26  import java.util.List;
27  import java.util.Queue;
28  import java.util.concurrent.ConcurrentLinkedQueue;
29  import java.util.concurrent.ThreadFactory;
30  import java.util.concurrent.TimeUnit;
31  import java.util.concurrent.atomic.AtomicBoolean;
32  
33  /**
34   * Checks if a thread is alive periodically and runs a task when a thread dies.
35   * <p>
36   * This thread starts a daemon thread to check the state of the threads being watched and to invoke their
37   * associated {@link Runnable}s.  When there is no thread to watch (i.e. all threads are dead), the daemon thread
38   * will terminate itself, and a new daemon thread will be started again when a new watch is added.
39   * </p>
40   */
41  public final class ThreadDeathWatcher {
42  
43      private static final InternalLogger logger = InternalLoggerFactory.getInstance(ThreadDeathWatcher.class);
44      // visible for testing
45      static final ThreadFactory threadFactory;
46  
47      // Use a MPMC queue as we may end up checking isEmpty() from multiple threads which may not be allowed to do
48      // concurrently depending on the implementation of it in a MPSC queue.
49      private static final Queue<Entry> pendingEntries = new ConcurrentLinkedQueue<Entry>();
50      private static final Watcher watcher = new Watcher();
51      private static final AtomicBoolean started = new AtomicBoolean();
52      private static volatile Thread watcherThread;
53  
54      static {
55          String poolName = "threadDeathWatcher";
56          String serviceThreadPrefix = SystemPropertyUtil.get("io.netty.serviceThreadPrefix");
57          if (!StringUtil.isNullOrEmpty(serviceThreadPrefix)) {
58              poolName = serviceThreadPrefix + poolName;
59          }
60          // because the ThreadDeathWatcher is a singleton, tasks submitted to it can come from arbitrary threads and
61          // this can trigger the creation of a thread from arbitrary thread groups; for this reason, the thread factory
62          // must not be sticky about its thread group
63          threadFactory = new DefaultThreadFactory(poolName, true, Thread.MIN_PRIORITY, null);
64      }
65  
66      /**
67       * Schedules the specified {@code task} to run when the specified {@code thread} dies.
68       *
69       * @param thread the {@link Thread} to watch
70       * @param task the {@link Runnable} to run when the {@code thread} dies
71       *
72       * @throws IllegalArgumentException if the specified {@code thread} is not alive
73       */
74      public static void watch(Thread thread, Runnable task) {
75          if (thread == null) {
76              throw new NullPointerException("thread");
77          }
78          if (task == null) {
79              throw new NullPointerException("task");
80          }
81          if (!thread.isAlive()) {
82              throw new IllegalArgumentException("thread must be alive.");
83          }
84  
85          schedule(thread, task, true);
86      }
87  
88      /**
89       * Cancels the task scheduled via {@link #watch(Thread, Runnable)}.
90       */
91      public static void unwatch(Thread thread, Runnable task) {
92          if (thread == null) {
93              throw new NullPointerException("thread");
94          }
95          if (task == null) {
96              throw new NullPointerException("task");
97          }
98  
99          schedule(thread, task, false);
100     }
101 
102     private static void schedule(Thread thread, Runnable task, boolean isWatch) {
103         pendingEntries.add(new Entry(thread, task, isWatch));
104 
105         if (started.compareAndSet(false, true)) {
106             Thread watcherThread = threadFactory.newThread(watcher);
107             // Set to null to ensure we not create classloader leaks by holds a strong reference to the inherited
108             // classloader.
109             // See:
110             // - https://github.com/netty/netty/issues/7290
111             // - https://bugs.openjdk.java.net/browse/JDK-7008595
112             watcherThread.setContextClassLoader(null);
113 
114             watcherThread.start();
115             ThreadDeathWatcher.watcherThread = watcherThread;
116         }
117     }
118 
119     /**
120      * Waits until the thread of this watcher has no threads to watch and terminates itself.
121      * Because a new watcher thread will be started again on {@link #watch(Thread, Runnable)},
122      * this operation is only useful when you want to ensure that the watcher thread is terminated
123      * <strong>after</strong> your application is shut down and there's no chance of calling
124      * {@link #watch(Thread, Runnable)} afterwards.
125      *
126      * @return {@code true} if and only if the watcher thread has been terminated
127      */
128     public static boolean awaitInactivity(long timeout, TimeUnit unit) throws InterruptedException {
129         if (unit == null) {
130             throw new NullPointerException("unit");
131         }
132 
133         Thread watcherThread = ThreadDeathWatcher.watcherThread;
134         if (watcherThread != null) {
135             watcherThread.join(unit.toMillis(timeout));
136             return !watcherThread.isAlive();
137         } else {
138             return true;
139         }
140     }
141 
142     private ThreadDeathWatcher() { }
143 
144     private static final class Watcher implements Runnable {
145 
146         private final List<Entry> watchees = new ArrayList<Entry>();
147 
148         @Override
149         public void run() {
150             for (;;) {
151                 fetchWatchees();
152                 notifyWatchees();
153 
154                 // Try once again just in case notifyWatchees() triggered watch() or unwatch().
155                 fetchWatchees();
156                 notifyWatchees();
157 
158                 try {
159                     Thread.sleep(1000);
160                 } catch (InterruptedException ignore) {
161                     // Ignore the interrupt; do not terminate until all tasks are run.
162                 }
163 
164                 if (watchees.isEmpty() && pendingEntries.isEmpty()) {
165 
166                     // Mark the current worker thread as stopped.
167                     // The following CAS must always success and must be uncontended,
168                     // because only one watcher thread should be running at the same time.
169                     boolean stopped = started.compareAndSet(true, false);
170                     assert stopped;
171 
172                     // Check if there are pending entries added by watch() while we do CAS above.
173                     if (pendingEntries.isEmpty()) {
174                         // A) watch() was not invoked and thus there's nothing to handle
175                         //    -> safe to terminate because there's nothing left to do
176                         // B) a new watcher thread started and handled them all
177                         //    -> safe to terminate the new watcher thread will take care the rest
178                         break;
179                     }
180 
181                     // There are pending entries again, added by watch()
182                     if (!started.compareAndSet(false, true)) {
183                         // watch() started a new watcher thread and set 'started' to true.
184                         // -> terminate this thread so that the new watcher reads from pendingEntries exclusively.
185                         break;
186                     }
187 
188                     // watch() added an entry, but this worker was faster to set 'started' to true.
189                     // i.e. a new watcher thread was not started
190                     // -> keep this thread alive to handle the newly added entries.
191                 }
192             }
193         }
194 
195         private void fetchWatchees() {
196             for (;;) {
197                 Entry e = pendingEntries.poll();
198                 if (e == null) {
199                     break;
200                 }
201 
202                 if (e.isWatch) {
203                     watchees.add(e);
204                 } else {
205                     watchees.remove(e);
206                 }
207             }
208         }
209 
210         private void notifyWatchees() {
211             List<Entry> watchees = this.watchees;
212             for (int i = 0; i < watchees.size();) {
213                 Entry e = watchees.get(i);
214                 if (!e.thread.isAlive()) {
215                     watchees.remove(i);
216                     try {
217                         e.task.run();
218                     } catch (Throwable t) {
219                         logger.warn("Thread death watcher task raised an exception:", t);
220                     }
221                 } else {
222                     i ++;
223                 }
224             }
225         }
226     }
227 
228     private static final class Entry {
229         final Thread thread;
230         final Runnable task;
231         final boolean isWatch;
232 
233         Entry(Thread thread, Runnable task, boolean isWatch) {
234             this.thread = thread;
235             this.task = task;
236             this.isWatch = isWatch;
237         }
238 
239         @Override
240         public int hashCode() {
241             return thread.hashCode() ^ task.hashCode();
242         }
243 
244         @Override
245         public boolean equals(Object obj) {
246             if (obj == this) {
247                 return true;
248             }
249 
250             if (!(obj instanceof Entry)) {
251                 return false;
252             }
253 
254             Entry that = (Entry) obj;
255             return thread == that.thread && task == that.task;
256         }
257     }
258 }