View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty5.util.concurrent;
17  
18  import io.netty5.util.internal.ThreadExecutorMap;
19  import io.netty5.util.internal.logging.InternalLogger;
20  import io.netty5.util.internal.logging.InternalLoggerFactory;
21  import org.jetbrains.annotations.Async.Execute;
22  import org.jetbrains.annotations.Async.Schedule;
23  
24  import java.security.AccessController;
25  import java.security.PrivilegedAction;
26  import java.util.Queue;
27  import java.util.concurrent.BlockingQueue;
28  import java.util.concurrent.Executors;
29  import java.util.concurrent.LinkedBlockingQueue;
30  import java.util.concurrent.RejectedExecutionException;
31  import java.util.concurrent.ThreadFactory;
32  import java.util.concurrent.TimeUnit;
33  import java.util.concurrent.atomic.AtomicBoolean;
34  
35  import static java.util.Objects.requireNonNull;
36  
37  /**
38   * Single-thread singleton {@link EventExecutor}.  It starts the thread automatically and stops it when there is no
39   * task pending in the task queue for 1 second.  Please note it is not scalable to schedule large number of tasks to
40   * this executor; use a dedicated executor.
41   */
42  public final class GlobalEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
43  
44      private static final InternalLogger logger = InternalLoggerFactory.getInstance(GlobalEventExecutor.class);
45  
46      private static final long SCHEDULE_QUIET_PERIOD_INTERVAL = TimeUnit.SECONDS.toNanos(1);
47  
48      private final RunnableScheduledFutureAdapter<Void> quietPeriodTask;
49      public static final GlobalEventExecutor INSTANCE;
50  
51      static {
52          INSTANCE = new GlobalEventExecutor();
53      }
54  
55      private final BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>();
56  
57      // because the GlobalEventExecutor is a singleton, tasks submitted to it can come from arbitrary threads and this
58      // can trigger the creation of a thread from arbitrary thread groups; for this reason, the thread factory must not
59      // be sticky about its thread group
60      // visible for testing
61      final ThreadFactory threadFactory;
62      private final TaskRunner taskRunner = new TaskRunner();
63      private final AtomicBoolean started = new AtomicBoolean();
64      volatile Thread thread;
65  
66      private final Future<Void> terminationFuture = DefaultPromise.<Void>newFailedPromise(
67              this, new UnsupportedOperationException()).asFuture();
68  
69      private GlobalEventExecutor() {
70          threadFactory = ThreadExecutorMap.apply(new DefaultThreadFactory(
71                  DefaultThreadFactory.toPoolName(getClass()), false, Thread.NORM_PRIORITY, null), this);
72          quietPeriodTask = new RunnableScheduledFutureAdapter<>(
73                  this, newPromise(), Executors.callable(() -> {
74              // NOOP
75          }, null),
76                  // note: the getCurrentTimeNanos() call here only works because this is a final class, otherwise
77                  // the method could be overridden leading to unsafe initialization here!
78                  deadlineNanos(getCurrentTimeNanos(), SCHEDULE_QUIET_PERIOD_INTERVAL),
79                  -SCHEDULE_QUIET_PERIOD_INTERVAL);
80  
81          scheduledTaskQueue().add(quietPeriodTask);
82      }
83  
84      /**
85       * Take the next {@link Runnable} from the task queue and so will block if no task is currently present.
86       *
87       * @return {@code null} if the executor thread has been interrupted or waken up.
88       */
89      private Runnable takeTask() {
90          BlockingQueue<Runnable> taskQueue = this.taskQueue;
91          for (;;) {
92              RunnableScheduledFuture<?> scheduledTask = peekScheduledTask();
93              if (scheduledTask == null) {
94                  Runnable task = null;
95                  try {
96                      task = taskQueue.take();
97                  } catch (InterruptedException e) {
98                      // Ignore
99                  }
100                 return task;
101             } else {
102                 long delayNanos = scheduledTask.delayNanos();
103                 Runnable task = null;
104                 if (delayNanos > 0) {
105                     try {
106                         task = taskQueue.poll(delayNanos, TimeUnit.NANOSECONDS);
107                     } catch (InterruptedException e) {
108                         // Waken up.
109                         return null;
110                     }
111                 }
112                 if (task == null) {
113                     // We need to fetch the scheduled tasks now as otherwise there may be a chance that
114                     // scheduled tasks are never executed if there is always one task in the taskQueue.
115                     // This is for example true for the read task of OIO Transport
116                     // See https://github.com/netty/netty/issues/1614
117                     fetchFromScheduledTaskQueue();
118                     task = taskQueue.poll();
119                 }
120 
121                 if (task != null) {
122                     return task;
123                 }
124             }
125         }
126     }
127 
128     private void fetchFromScheduledTaskQueue() {
129         long nanoTime = getCurrentTimeNanos();
130         Runnable scheduledTask = pollScheduledTask(nanoTime);
131         while (scheduledTask != null) {
132             taskQueue.add(scheduledTask);
133             scheduledTask = pollScheduledTask(nanoTime);
134         }
135     }
136 
137     /**
138      * Return the number of tasks that are pending for processing.
139      */
140     public int pendingTasks() {
141         return taskQueue.size();
142     }
143 
144     /**
145      * Add a task to the task queue, or throws a {@link RejectedExecutionException} if this instance was shutdown
146      * before.
147      */
148     private void addTask(Runnable task) {
149         requireNonNull(task, "task");
150         taskQueue.add(task);
151     }
152 
153     @Override
154     public boolean inEventLoop(Thread thread) {
155         return thread == this.thread;
156     }
157 
158     @Override
159     public Future<Void> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
160         return terminationFuture();
161     }
162 
163     @Override
164     public Future<Void> terminationFuture() {
165         return terminationFuture;
166     }
167 
168     @Override
169     public boolean isShuttingDown() {
170         return false;
171     }
172 
173     @Override
174     public boolean isShutdown() {
175         return false;
176     }
177 
178     @Override
179     public boolean isTerminated() {
180         return false;
181     }
182 
183     @Override
184     public boolean awaitTermination(long timeout, TimeUnit unit) {
185         return false;
186     }
187 
188     /**
189      * Waits until the worker thread of this executor has no tasks left in its task queue and terminates itself.
190      * Because a new worker thread will be started again when a new task is submitted, this operation is only useful
191      * when you want to ensure that the worker thread is terminated <strong>after</strong> your application is shut
192      * down and there's no chance of submitting a new task afterwards.
193      *
194      * @return {@code true} if and only if the worker thread has been terminated
195      */
196     public boolean awaitInactivity(long timeout, TimeUnit unit) throws InterruptedException {
197         requireNonNull(unit, "unit");
198 
199         final Thread thread = this.thread;
200         if (thread == null) {
201             throw new IllegalStateException("thread was not started");
202         }
203         thread.join(unit.toMillis(timeout));
204         return !thread.isAlive();
205     }
206 
207     @Override
208     public void execute(@Schedule Runnable task) {
209         requireNonNull(task, "task");
210 
211         addTask(task);
212         if (!inEventLoop()) {
213             startThread();
214         }
215     }
216 
217     private void startThread() {
218         if (started.compareAndSet(false, true)) {
219             final Thread t = threadFactory.newThread(taskRunner);
220             // Set to null to ensure we not create classloader leaks by holds a strong reference to the inherited
221             // classloader.
222             // See:
223             // - https://github.com/netty/netty/issues/7290
224             // - https://bugs.openjdk.java.net/browse/JDK-7008595
225             AccessController.doPrivileged((PrivilegedAction<Void>) () -> {
226                 t.setContextClassLoader(null);
227                 return null;
228             });
229 
230             // Set the thread before starting it as otherwise inEventLoop() may return false and so produce
231             // an assert error.
232             // See https://github.com/netty/netty/issues/4357
233             thread = t;
234             t.start();
235         }
236     }
237 
238     final class TaskRunner implements Runnable {
239         @Override
240         public void run() {
241             for (;;) {
242                 Runnable task = takeTask();
243                 if (task != null) {
244                     try {
245                         runTask(task);
246                     } catch (Throwable t) {
247                         logger.warn("Unexpected exception from the global event executor: ", t);
248                     }
249 
250                     if (task != quietPeriodTask) {
251                         continue;
252                     }
253                 }
254 
255                 Queue<RunnableScheduledFutureNode<?>> scheduledTaskQueue = scheduledTaskQueue();
256                 // Terminate if there is no task in the queue (except the noop task).
257                 if (taskQueue.isEmpty() && scheduledTaskQueue.size() <= 1) {
258                     // Mark the current thread as stopped.
259                     // The following CAS must always success and must be uncontended,
260                     // because only one thread should be running at the same time.
261                     boolean stopped = started.compareAndSet(true, false);
262                     assert stopped;
263 
264                     // Do not check scheduledTaskQueue because it is not thread-safe and can only be mutated from a
265                     // TaskRunner actively running tasks.
266                     if (taskQueue.isEmpty()) {
267                         // A) No new task was added and thus there's nothing to handle
268                         //    -> safe to terminate because there's nothing left to do
269                         // B) A new thread started and handled all the new tasks.
270                         //    -> safe to terminate the new thread will take care the rest
271                         break;
272                     }
273 
274                     // There are pending tasks added again.
275                     if (!started.compareAndSet(false, true)) {
276                         // startThread() started a new thread and set 'started' to true.
277                         // -> terminate this thread so that the new thread reads from taskQueue exclusively.
278                         break;
279                     }
280 
281                     // New tasks were added, but this worker was faster to set 'started' to true.
282                     // i.e. a new worker thread was not started by startThread().
283                     // -> keep this thread alive to handle the newly added entries.
284                 }
285             }
286         }
287 
288         private void runTask(@Execute Runnable task) {
289             task.run();
290         }
291     }
292 }