View Javadoc

1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  
17  package io.netty.util;
18  
19  import io.netty.util.concurrent.DefaultThreadFactory;
20  import io.netty.util.internal.StringUtil;
21  import io.netty.util.internal.SystemPropertyUtil;
22  import io.netty.util.internal.logging.InternalLogger;
23  import io.netty.util.internal.logging.InternalLoggerFactory;
24  
25  import java.security.AccessController;
26  import java.security.PrivilegedAction;
27  import java.util.ArrayList;
28  import java.util.List;
29  import java.util.Queue;
30  import java.util.concurrent.ConcurrentLinkedQueue;
31  import java.util.concurrent.ThreadFactory;
32  import java.util.concurrent.TimeUnit;
33  import java.util.concurrent.atomic.AtomicBoolean;
34  
35  /**
36   * Checks if a thread is alive periodically and runs a task when a thread dies.
37   * <p>
38   * This thread starts a daemon thread to check the state of the threads being watched and to invoke their
39   * associated {@link Runnable}s.  When there is no thread to watch (i.e. all threads are dead), the daemon thread
40   * will terminate itself, and a new daemon thread will be started again when a new watch is added.
41   * </p>
42   *
43   * @deprecated will be removed in the next major release
44   */
45  @Deprecated
46  public final class ThreadDeathWatcher {
47  
48      private static final InternalLogger logger = InternalLoggerFactory.getInstance(ThreadDeathWatcher.class);
49      // visible for testing
50      static final ThreadFactory threadFactory;
51  
52      // Use a MPMC queue as we may end up checking isEmpty() from multiple threads which may not be allowed to do
53      // concurrently depending on the implementation of it in a MPSC queue.
54      private static final Queue<Entry> pendingEntries = new ConcurrentLinkedQueue<Entry>();
55      private static final Watcher watcher = new Watcher();
56      private static final AtomicBoolean started = new AtomicBoolean();
57      private static volatile Thread watcherThread;
58  
59      static {
60          String poolName = "threadDeathWatcher";
61          String serviceThreadPrefix = SystemPropertyUtil.get("io.netty.serviceThreadPrefix");
62          if (!StringUtil.isNullOrEmpty(serviceThreadPrefix)) {
63              poolName = serviceThreadPrefix + poolName;
64          }
65          // because the ThreadDeathWatcher is a singleton, tasks submitted to it can come from arbitrary threads and
66          // this can trigger the creation of a thread from arbitrary thread groups; for this reason, the thread factory
67          // must not be sticky about its thread group
68          threadFactory = new DefaultThreadFactory(poolName, true, Thread.MIN_PRIORITY, null);
69      }
70  
71      /**
72       * Schedules the specified {@code task} to run when the specified {@code thread} dies.
73       *
74       * @param thread the {@link Thread} to watch
75       * @param task the {@link Runnable} to run when the {@code thread} dies
76       *
77       * @throws IllegalArgumentException if the specified {@code thread} is not alive
78       */
79      public static void watch(Thread thread, Runnable task) {
80          if (thread == null) {
81              throw new NullPointerException("thread");
82          }
83          if (task == null) {
84              throw new NullPointerException("task");
85          }
86          if (!thread.isAlive()) {
87              throw new IllegalArgumentException("thread must be alive.");
88          }
89  
90          schedule(thread, task, true);
91      }
92  
93      /**
94       * Cancels the task scheduled via {@link #watch(Thread, Runnable)}.
95       */
96      public static void unwatch(Thread thread, Runnable task) {
97          if (thread == null) {
98              throw new NullPointerException("thread");
99          }
100         if (task == null) {
101             throw new NullPointerException("task");
102         }
103 
104         schedule(thread, task, false);
105     }
106 
107     private static void schedule(Thread thread, Runnable task, boolean isWatch) {
108         pendingEntries.add(new Entry(thread, task, isWatch));
109 
110         if (started.compareAndSet(false, true)) {
111             final Thread watcherThread = threadFactory.newThread(watcher);
112             // Set to null to ensure we not create classloader leaks by holds a strong reference to the inherited
113             // classloader.
114             // See:
115             // - https://github.com/netty/netty/issues/7290
116             // - https://bugs.openjdk.java.net/browse/JDK-7008595
117             AccessController.doPrivileged(new PrivilegedAction<Void>() {
118                 @Override
119                 public Void run() {
120                     watcherThread.setContextClassLoader(null);
121                     return null;
122                 }
123             });
124 
125             watcherThread.start();
126             ThreadDeathWatcher.watcherThread = watcherThread;
127         }
128     }
129 
130     /**
131      * Waits until the thread of this watcher has no threads to watch and terminates itself.
132      * Because a new watcher thread will be started again on {@link #watch(Thread, Runnable)},
133      * this operation is only useful when you want to ensure that the watcher thread is terminated
134      * <strong>after</strong> your application is shut down and there's no chance of calling
135      * {@link #watch(Thread, Runnable)} afterwards.
136      *
137      * @return {@code true} if and only if the watcher thread has been terminated
138      */
139     public static boolean awaitInactivity(long timeout, TimeUnit unit) throws InterruptedException {
140         if (unit == null) {
141             throw new NullPointerException("unit");
142         }
143 
144         Thread watcherThread = ThreadDeathWatcher.watcherThread;
145         if (watcherThread != null) {
146             watcherThread.join(unit.toMillis(timeout));
147             return !watcherThread.isAlive();
148         } else {
149             return true;
150         }
151     }
152 
153     private ThreadDeathWatcher() { }
154 
155     private static final class Watcher implements Runnable {
156 
157         private final List<Entry> watchees = new ArrayList<Entry>();
158 
159         @Override
160         public void run() {
161             for (;;) {
162                 fetchWatchees();
163                 notifyWatchees();
164 
165                 // Try once again just in case notifyWatchees() triggered watch() or unwatch().
166                 fetchWatchees();
167                 notifyWatchees();
168 
169                 try {
170                     Thread.sleep(1000);
171                 } catch (InterruptedException ignore) {
172                     // Ignore the interrupt; do not terminate until all tasks are run.
173                 }
174 
175                 if (watchees.isEmpty() && pendingEntries.isEmpty()) {
176 
177                     // Mark the current worker thread as stopped.
178                     // The following CAS must always success and must be uncontended,
179                     // because only one watcher thread should be running at the same time.
180                     boolean stopped = started.compareAndSet(true, false);
181                     assert stopped;
182 
183                     // Check if there are pending entries added by watch() while we do CAS above.
184                     if (pendingEntries.isEmpty()) {
185                         // A) watch() was not invoked and thus there's nothing to handle
186                         //    -> safe to terminate because there's nothing left to do
187                         // B) a new watcher thread started and handled them all
188                         //    -> safe to terminate the new watcher thread will take care the rest
189                         break;
190                     }
191 
192                     // There are pending entries again, added by watch()
193                     if (!started.compareAndSet(false, true)) {
194                         // watch() started a new watcher thread and set 'started' to true.
195                         // -> terminate this thread so that the new watcher reads from pendingEntries exclusively.
196                         break;
197                     }
198 
199                     // watch() added an entry, but this worker was faster to set 'started' to true.
200                     // i.e. a new watcher thread was not started
201                     // -> keep this thread alive to handle the newly added entries.
202                 }
203             }
204         }
205 
206         private void fetchWatchees() {
207             for (;;) {
208                 Entry e = pendingEntries.poll();
209                 if (e == null) {
210                     break;
211                 }
212 
213                 if (e.isWatch) {
214                     watchees.add(e);
215                 } else {
216                     watchees.remove(e);
217                 }
218             }
219         }
220 
221         private void notifyWatchees() {
222             List<Entry> watchees = this.watchees;
223             for (int i = 0; i < watchees.size();) {
224                 Entry e = watchees.get(i);
225                 if (!e.thread.isAlive()) {
226                     watchees.remove(i);
227                     try {
228                         e.task.run();
229                     } catch (Throwable t) {
230                         logger.warn("Thread death watcher task raised an exception:", t);
231                     }
232                 } else {
233                     i ++;
234                 }
235             }
236         }
237     }
238 
239     private static final class Entry {
240         final Thread thread;
241         final Runnable task;
242         final boolean isWatch;
243 
244         Entry(Thread thread, Runnable task, boolean isWatch) {
245             this.thread = thread;
246             this.task = task;
247             this.isWatch = isWatch;
248         }
249 
250         @Override
251         public int hashCode() {
252             return thread.hashCode() ^ task.hashCode();
253         }
254 
255         @Override
256         public boolean equals(Object obj) {
257             if (obj == this) {
258                 return true;
259             }
260 
261             if (!(obj instanceof Entry)) {
262                 return false;
263             }
264 
265             Entry that = (Entry) obj;
266             return thread == that.thread && task == that.task;
267         }
268     }
269 }