View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.util.concurrent;
17  
18  import io.netty.util.internal.logging.InternalLogger;
19  import io.netty.util.internal.logging.InternalLoggerFactory;
20  
21  import java.util.Queue;
22  import java.util.concurrent.BlockingQueue;
23  import java.util.concurrent.Executors;
24  import java.util.concurrent.LinkedBlockingQueue;
25  import java.util.concurrent.RejectedExecutionException;
26  import java.util.concurrent.ThreadFactory;
27  import java.util.concurrent.TimeUnit;
28  import java.util.concurrent.atomic.AtomicBoolean;
29  
30  /**
31   * Single-thread singleton {@link EventExecutor}.  It starts the thread automatically and stops it when there is no
32   * task pending in the task queue for 1 second.  Please note it is not scalable to schedule large number of tasks to
33   * this executor; use a dedicated executor.
34   */
35  public final class GlobalEventExecutor extends AbstractScheduledEventExecutor {
36  
37      private static final InternalLogger logger = InternalLoggerFactory.getInstance(GlobalEventExecutor.class);
38  
39      private static final long SCHEDULE_PURGE_INTERVAL = TimeUnit.SECONDS.toNanos(1);
40  
41      public static final GlobalEventExecutor INSTANCE = new GlobalEventExecutor();
42  
43      final BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<Runnable>();
44      final ScheduledFutureTask<Void> purgeTask = new ScheduledFutureTask<Void>(
45              this, Executors.<Void>callable(new PurgeTask(), null),
46              ScheduledFutureTask.deadlineNanos(SCHEDULE_PURGE_INTERVAL), -SCHEDULE_PURGE_INTERVAL);
47  
48      private final ThreadFactory threadFactory = new DefaultThreadFactory(getClass());
49      private final TaskRunner taskRunner = new TaskRunner();
50      private final AtomicBoolean started = new AtomicBoolean();
51      volatile Thread thread;
52  
53      private final Future<?> terminationFuture = new FailedFuture<Object>(this, new UnsupportedOperationException());
54  
55      private GlobalEventExecutor() {
56          scheduledTaskQueue().add(purgeTask);
57      }
58  
59      /**
60       * Take the next {@link Runnable} from the task queue and so will block if no task is currently present.
61       *
62       * @return {@code null} if the executor thread has been interrupted or waken up.
63       */
64      Runnable takeTask() {
65          BlockingQueue<Runnable> taskQueue = this.taskQueue;
66          for (;;) {
67              ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
68              if (scheduledTask == null) {
69                  Runnable task = null;
70                  try {
71                      task = taskQueue.take();
72                  } catch (InterruptedException e) {
73                      // Ignore
74                  }
75                  return task;
76              } else {
77                  long delayNanos = scheduledTask.delayNanos();
78                  Runnable task;
79                  if (delayNanos > 0) {
80                      try {
81                          task = taskQueue.poll(delayNanos, TimeUnit.NANOSECONDS);
82                      } catch (InterruptedException e) {
83                          // Waken up.
84                          return null;
85                      }
86                  } else {
87                      task = taskQueue.poll();
88                  }
89  
90                  if (task == null) {
91                      fetchFromScheduledTaskQueue();
92                      task = taskQueue.poll();
93                  }
94  
95                  if (task != null) {
96                      return task;
97                  }
98              }
99          }
100     }
101 
102     private void fetchFromScheduledTaskQueue() {
103         if (hasScheduledTasks()) {
104             long nanoTime = AbstractScheduledEventExecutor.nanoTime();
105             for (;;) {
106                 Runnable scheduledTask = pollScheduledTask(nanoTime);
107                 if (scheduledTask == null) {
108                     break;
109                 }
110                 taskQueue.add(scheduledTask);
111             }
112         }
113     }
114 
115     /**
116      * Return the number of tasks that are pending for processing.
117      *
118      * <strong>Be aware that this operation may be expensive as it depends on the internal implementation of the
119      * SingleThreadEventExecutor. So use it was care!</strong>
120      */
121     public int pendingTasks() {
122         return taskQueue.size();
123     }
124 
125     /**
126      * Add a task to the task queue, or throws a {@link RejectedExecutionException} if this instance was shutdown
127      * before.
128      */
129     private void addTask(Runnable task) {
130         if (task == null) {
131             throw new NullPointerException("task");
132         }
133         taskQueue.add(task);
134     }
135 
136     @Override
137     public boolean inEventLoop(Thread thread) {
138         return thread == this.thread;
139     }
140 
141     @Override
142     public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
143         return terminationFuture();
144     }
145 
146     @Override
147     public Future<?> terminationFuture() {
148         return terminationFuture;
149     }
150 
151     @Override
152     @Deprecated
153     public void shutdown() {
154         throw new UnsupportedOperationException();
155     }
156 
157     @Override
158     public boolean isShuttingDown() {
159         return false;
160     }
161 
162     @Override
163     public boolean isShutdown() {
164         return false;
165     }
166 
167     @Override
168     public boolean isTerminated() {
169         return false;
170     }
171 
172     @Override
173     public boolean awaitTermination(long timeout, TimeUnit unit) {
174         return false;
175     }
176 
177     /**
178      * Waits until the worker thread of this executor has no tasks left in its task queue and terminates itself.
179      * Because a new worker thread will be started again when a new task is submitted, this operation is only useful
180      * when you want to ensure that the worker thread is terminated <strong>after</strong> your application is shut
181      * down and there's no chance of submitting a new task afterwards.
182      *
183      * @return {@code true} if and only if the worker thread has been terminated
184      */
185     public boolean awaitInactivity(long timeout, TimeUnit unit) throws InterruptedException {
186         if (unit == null) {
187             throw new NullPointerException("unit");
188         }
189 
190         final Thread thread = this.thread;
191         if (thread == null) {
192             throw new IllegalStateException("thread was not started");
193         }
194         thread.join(unit.toMillis(timeout));
195         return !thread.isAlive();
196     }
197 
198     @Override
199     public void execute(Runnable task) {
200         if (task == null) {
201             throw new NullPointerException("task");
202         }
203 
204         addTask(task);
205         if (!inEventLoop()) {
206             startThread();
207         }
208     }
209 
210     private void startThread() {
211         if (started.compareAndSet(false, true)) {
212             Thread t = threadFactory.newThread(taskRunner);
213             t.start();
214             thread = t;
215         }
216     }
217 
218     final class TaskRunner implements Runnable {
219         @Override
220         public void run() {
221             for (;;) {
222                 Runnable task = takeTask();
223                 if (task != null) {
224                     try {
225                         task.run();
226                     } catch (Throwable t) {
227                         logger.warn("Unexpected exception from the global event executor: ", t);
228                     }
229 
230                     if (task != purgeTask) {
231                         continue;
232                     }
233                 }
234 
235                 Queue<ScheduledFutureTask<?>> scheduledTaskQueue = GlobalEventExecutor.this.scheduledTaskQueue;
236                 // Terminate if there is no task in the queue (except the purge task).
237                 if (taskQueue.isEmpty() && (scheduledTaskQueue == null || scheduledTaskQueue.size() == 1)) {
238                     // Mark the current thread as stopped.
239                     // The following CAS must always success and must be uncontended,
240                     // because only one thread should be running at the same time.
241                     boolean stopped = started.compareAndSet(true, false);
242                     assert stopped;
243 
244                     // Check if there are pending entries added by execute() or schedule*() while we do CAS above.
245                     if (taskQueue.isEmpty() && (scheduledTaskQueue == null || scheduledTaskQueue.size() == 1)) {
246                         // A) No new task was added and thus there's nothing to handle
247                         //    -> safe to terminate because there's nothing left to do
248                         // B) A new thread started and handled all the new tasks.
249                         //    -> safe to terminate the new thread will take care the rest
250                         break;
251                     }
252 
253                     // There are pending tasks added again.
254                     if (!started.compareAndSet(false, true)) {
255                         // startThread() started a new thread and set 'started' to true.
256                         // -> terminate this thread so that the new thread reads from taskQueue exclusively.
257                         break;
258                     }
259 
260                     // New tasks were added, but this worker was faster to set 'started' to true.
261                     // i.e. a new worker thread was not started by startThread().
262                     // -> keep this thread alive to handle the newly added entries.
263                 }
264             }
265         }
266     }
267 
268     private final class PurgeTask implements Runnable {
269         @Override
270         public void run() {
271             purgeCancelledScheduledTasks();
272         }
273     }
274 }