View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.util.concurrent;
17  
18  import io.netty.util.internal.ObjectUtil;
19  import io.netty.util.internal.PlatformDependent;
20  import io.netty.util.internal.SystemPropertyUtil;
21  import io.netty.util.internal.ThreadExecutorMap;
22  import io.netty.util.internal.logging.InternalLogger;
23  import io.netty.util.internal.logging.InternalLoggerFactory;
24  import org.jetbrains.annotations.Async.Schedule;
25  
26  import java.lang.Thread.State;
27  import java.util.ArrayList;
28  import java.util.Collection;
29  import java.util.LinkedHashSet;
30  import java.util.List;
31  import java.util.Queue;
32  import java.util.Set;
33  import java.util.concurrent.BlockingQueue;
34  import java.util.concurrent.Callable;
35  import java.util.concurrent.CountDownLatch;
36  import java.util.concurrent.ExecutionException;
37  import java.util.concurrent.Executor;
38  import java.util.concurrent.LinkedBlockingQueue;
39  import java.util.concurrent.RejectedExecutionException;
40  import java.util.concurrent.ThreadFactory;
41  import java.util.concurrent.TimeUnit;
42  import java.util.concurrent.TimeoutException;
43  import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
44  import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
45  import java.util.concurrent.locks.Lock;
46  import java.util.concurrent.locks.ReentrantLock;
47  
48  /**
49   * Abstract base class for {@link OrderedEventExecutor}'s that execute all its submitted tasks in a single thread.
50   *
51   */
52  public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
53  
54      static final int DEFAULT_MAX_PENDING_EXECUTOR_TASKS = Math.max(16,
55              SystemPropertyUtil.getInt("io.netty.eventexecutor.maxPendingTasks", Integer.MAX_VALUE));
56  
57      private static final InternalLogger logger =
58              InternalLoggerFactory.getInstance(SingleThreadEventExecutor.class);
59  
60      private static final int ST_NOT_STARTED = 1;
61      private static final int ST_SUSPENDING = 2;
62      private static final int ST_SUSPENDED = 3;
63      private static final int ST_STARTED = 4;
64      private static final int ST_SHUTTING_DOWN = 5;
65      private static final int ST_SHUTDOWN = 6;
66      private static final int ST_TERMINATED = 7;
67  
68      private static final Runnable NOOP_TASK = new Runnable() {
69          @Override
70          public void run() {
71              // Do nothing.
72          }
73      };
74  
75      private static final AtomicIntegerFieldUpdater<SingleThreadEventExecutor> STATE_UPDATER =
76              AtomicIntegerFieldUpdater.newUpdater(SingleThreadEventExecutor.class, "state");
77      private static final AtomicReferenceFieldUpdater<SingleThreadEventExecutor, ThreadProperties> PROPERTIES_UPDATER =
78              AtomicReferenceFieldUpdater.newUpdater(
79                      SingleThreadEventExecutor.class, ThreadProperties.class, "threadProperties");
80      private final Queue<Runnable> taskQueue;
81  
82      private volatile Thread thread;
83      @SuppressWarnings("unused")
84      private volatile ThreadProperties threadProperties;
85      private final Executor executor;
86      private volatile boolean interrupted;
87  
88      private final Lock processingLock = new ReentrantLock();
89      private final CountDownLatch threadLock = new CountDownLatch(1);
90      private final Set<Runnable> shutdownHooks = new LinkedHashSet<Runnable>();
91      private final boolean addTaskWakesUp;
92      private final int maxPendingTasks;
93      private final RejectedExecutionHandler rejectedExecutionHandler;
94      private final boolean supportSuspension;
95  
96      private long lastExecutionTime;
97  
98      @SuppressWarnings({ "FieldMayBeFinal", "unused" })
99      private volatile int state = ST_NOT_STARTED;
100 
101     private volatile long gracefulShutdownQuietPeriod;
102     private volatile long gracefulShutdownTimeout;
103     private long gracefulShutdownStartTime;
104 
105     private final Promise<?> terminationFuture = new DefaultPromise<Void>(GlobalEventExecutor.INSTANCE);
106 
107     /**
108      * Create a new instance
109      *
110      * @param parent            the {@link EventExecutorGroup} which is the parent of this instance and belongs to it
111      * @param threadFactory     the {@link ThreadFactory} which will be used for the used {@link Thread}
112      * @param addTaskWakesUp    {@code true} if and only if invocation of {@link #addTask(Runnable)} will wake up the
113      *                          executor thread
114      */
115     protected SingleThreadEventExecutor(
116             EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
117         this(parent, new ThreadPerTaskExecutor(threadFactory), addTaskWakesUp);
118     }
119 
120     /**
121      * Create a new instance
122      *
123      * @param parent            the {@link EventExecutorGroup} which is the parent of this instance and belongs to it
124      * @param threadFactory     the {@link ThreadFactory} which will be used for the used {@link Thread}
125      * @param addTaskWakesUp    {@code true} if and only if invocation of {@link #addTask(Runnable)} will wake up the
126      *                          executor thread
127      * @param maxPendingTasks   the maximum number of pending tasks before new tasks will be rejected.
128      * @param rejectedHandler   the {@link RejectedExecutionHandler} to use.
129      */
130     protected SingleThreadEventExecutor(
131             EventExecutorGroup parent, ThreadFactory threadFactory,
132             boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedHandler) {
133         this(parent, new ThreadPerTaskExecutor(threadFactory), addTaskWakesUp, maxPendingTasks, rejectedHandler);
134     }
135 
136     /**
137      * Create a new instance
138      *
139      * @param parent            the {@link EventExecutorGroup} which is the parent of this instance and belongs to it
140      * @param threadFactory     the {@link ThreadFactory} which will be used for the used {@link Thread}
141      * @param addTaskWakesUp    {@code true} if and only if invocation of {@link #addTask(Runnable)} will wake up the
142      *                          executor thread
143      * @param supportSuspension {@code true} if suspension of this {@link SingleThreadEventExecutor} is supported.
144      * @param maxPendingTasks   the maximum number of pending tasks before new tasks will be rejected.
145      * @param rejectedHandler   the {@link RejectedExecutionHandler} to use.
146      */
147     protected SingleThreadEventExecutor(
148             EventExecutorGroup parent, ThreadFactory threadFactory,
149             boolean addTaskWakesUp, boolean supportSuspension,
150             int maxPendingTasks, RejectedExecutionHandler rejectedHandler) {
151         this(parent, new ThreadPerTaskExecutor(threadFactory), addTaskWakesUp, supportSuspension,
152                 maxPendingTasks, rejectedHandler);
153     }
154 
155     /**
156      * Create a new instance
157      *
158      * @param parent            the {@link EventExecutorGroup} which is the parent of this instance and belongs to it
159      * @param executor          the {@link Executor} which will be used for executing
160      * @param addTaskWakesUp    {@code true} if and only if invocation of {@link #addTask(Runnable)} will wake up the
161      *                          executor thread
162      */
163     protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp) {
164         this(parent, executor, addTaskWakesUp, DEFAULT_MAX_PENDING_EXECUTOR_TASKS, RejectedExecutionHandlers.reject());
165     }
166 
167     /**
168      * Create a new instance
169      *
170      * @param parent            the {@link EventExecutorGroup} which is the parent of this instance and belongs to it
171      * @param executor          the {@link Executor} which will be used for executing
172      * @param addTaskWakesUp    {@code true} if and only if invocation of {@link #addTask(Runnable)} will wake up the
173      *                          executor thread
174      * @param maxPendingTasks   the maximum number of pending tasks before new tasks will be rejected.
175      * @param rejectedHandler   the {@link RejectedExecutionHandler} to use.
176      */
177     protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
178                                         boolean addTaskWakesUp, int maxPendingTasks,
179                                         RejectedExecutionHandler rejectedHandler) {
180         this(parent, executor, addTaskWakesUp, false, maxPendingTasks, rejectedHandler);
181     }
182 
183     /**
184      * Create a new instance
185      *
186      * @param parent            the {@link EventExecutorGroup} which is the parent of this instance and belongs to it
187      * @param executor          the {@link Executor} which will be used for executing
188      * @param addTaskWakesUp    {@code true} if and only if invocation of {@link #addTask(Runnable)} will wake up the
189      *                          executor thread
190      * @param supportSuspension {@code true} if suspension of this {@link SingleThreadEventExecutor} is supported.
191      * @param maxPendingTasks   the maximum number of pending tasks before new tasks will be rejected.
192      * @param rejectedHandler   the {@link RejectedExecutionHandler} to use.
193      */
194     protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
195                                         boolean addTaskWakesUp, boolean supportSuspension,
196                                         int maxPendingTasks, RejectedExecutionHandler rejectedHandler) {
197         super(parent);
198         this.addTaskWakesUp = addTaskWakesUp;
199         this.supportSuspension = supportSuspension;
200         this.maxPendingTasks = Math.max(16, maxPendingTasks);
201         this.executor = ThreadExecutorMap.apply(executor, this);
202         taskQueue = newTaskQueue(this.maxPendingTasks);
203         rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
204     }
205 
206     protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
207                                         boolean addTaskWakesUp, Queue<Runnable> taskQueue,
208                                         RejectedExecutionHandler rejectedHandler) {
209         this(parent, executor, addTaskWakesUp, false, taskQueue, rejectedHandler);
210     }
211 
212     protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
213                                         boolean addTaskWakesUp, boolean supportSuspension,
214                                         Queue<Runnable> taskQueue, RejectedExecutionHandler rejectedHandler) {
215         super(parent);
216         this.addTaskWakesUp = addTaskWakesUp;
217         this.supportSuspension = supportSuspension;
218         this.maxPendingTasks = DEFAULT_MAX_PENDING_EXECUTOR_TASKS;
219         this.executor = ThreadExecutorMap.apply(executor, this);
220         this.taskQueue = ObjectUtil.checkNotNull(taskQueue, "taskQueue");
221         this.rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
222     }
223 
224     /**
225      * @deprecated Please use and override {@link #newTaskQueue(int)}.
226      */
227     @Deprecated
228     protected Queue<Runnable> newTaskQueue() {
229         return newTaskQueue(maxPendingTasks);
230     }
231 
232     /**
233      * Create a new {@link Queue} which will holds the tasks to execute. This default implementation will return a
234      * {@link LinkedBlockingQueue} but if your sub-class of {@link SingleThreadEventExecutor} will not do any blocking
235      * calls on the this {@link Queue} it may make sense to {@code @Override} this and return some more performant
236      * implementation that does not support blocking operations at all.
237      */
238     protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
239         return new LinkedBlockingQueue<Runnable>(maxPendingTasks);
240     }
241 
242     /**
243      * Interrupt the current running {@link Thread}.
244      */
245     protected void interruptThread() {
246         Thread currentThread = thread;
247         if (currentThread == null) {
248             interrupted = true;
249         } else {
250             currentThread.interrupt();
251         }
252     }
253 
254     /**
255      * @see Queue#poll()
256      */
257     protected Runnable pollTask() {
258         assert inEventLoop();
259         return pollTaskFrom(taskQueue);
260     }
261 
262     protected static Runnable pollTaskFrom(Queue<Runnable> taskQueue) {
263         for (;;) {
264             Runnable task = taskQueue.poll();
265             if (task != WAKEUP_TASK) {
266                 return task;
267             }
268         }
269     }
270 
271     /**
272      * Take the next {@link Runnable} from the task queue and so will block if no task is currently present.
273      * <p>
274      * Be aware that this method will throw an {@link UnsupportedOperationException} if the task queue, which was
275      * created via {@link #newTaskQueue()}, does not implement {@link BlockingQueue}.
276      * </p>
277      *
278      * @return {@code null} if the executor thread has been interrupted or waken up.
279      */
280     protected Runnable takeTask() {
281         assert inEventLoop();
282         if (!(taskQueue instanceof BlockingQueue)) {
283             throw new UnsupportedOperationException();
284         }
285 
286         BlockingQueue<Runnable> taskQueue = (BlockingQueue<Runnable>) this.taskQueue;
287         for (;;) {
288             ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
289             if (scheduledTask == null) {
290                 Runnable task = null;
291                 try {
292                     task = taskQueue.take();
293                     if (task == WAKEUP_TASK) {
294                         task = null;
295                     }
296                 } catch (InterruptedException e) {
297                     // Ignore
298                 }
299                 return task;
300             } else {
301                 long delayNanos = scheduledTask.delayNanos();
302                 Runnable task = null;
303                 if (delayNanos > 0) {
304                     try {
305                         task = taskQueue.poll(delayNanos, TimeUnit.NANOSECONDS);
306                     } catch (InterruptedException e) {
307                         // Waken up.
308                         return null;
309                     }
310                 }
311                 if (task == null) {
312                     // We need to fetch the scheduled tasks now as otherwise there may be a chance that
313                     // scheduled tasks are never executed if there is always one task in the taskQueue.
314                     // This is for example true for the read task of OIO Transport
315                     // See https://github.com/netty/netty/issues/1614
316                     fetchFromScheduledTaskQueue();
317                     task = taskQueue.poll();
318                 }
319 
320                 if (task != null) {
321                     if (task == WAKEUP_TASK) {
322                         return null;
323                     }
324                     return task;
325                 }
326             }
327         }
328     }
329 
330     private boolean fetchFromScheduledTaskQueue() {
331         if (scheduledTaskQueue == null || scheduledTaskQueue.isEmpty()) {
332             return true;
333         }
334         long nanoTime = getCurrentTimeNanos();
335         for (;;) {
336             Runnable scheduledTask = pollScheduledTask(nanoTime);
337             if (scheduledTask == null) {
338                 return true;
339             }
340             if (!taskQueue.offer(scheduledTask)) {
341                 // No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again.
342                 scheduledTaskQueue.add((ScheduledFutureTask<?>) scheduledTask);
343                 return false;
344             }
345         }
346     }
347 
348     /**
349      * @return {@code true} if at least one scheduled task was executed.
350      */
351     private boolean executeExpiredScheduledTasks() {
352         if (scheduledTaskQueue == null || scheduledTaskQueue.isEmpty()) {
353             return false;
354         }
355         long nanoTime = getCurrentTimeNanos();
356         Runnable scheduledTask = pollScheduledTask(nanoTime);
357         if (scheduledTask == null) {
358             return false;
359         }
360         do {
361             safeExecute(scheduledTask);
362         } while ((scheduledTask = pollScheduledTask(nanoTime)) != null);
363         return true;
364     }
365 
366     /**
367      * @see Queue#peek()
368      */
369     protected Runnable peekTask() {
370         assert inEventLoop();
371         return taskQueue.peek();
372     }
373 
374     /**
375      * @see Queue#isEmpty()
376      */
377     protected boolean hasTasks() {
378         assert inEventLoop();
379         return !taskQueue.isEmpty();
380     }
381 
382     /**
383      * Return the number of tasks that are pending for processing.
384      */
385     public int pendingTasks() {
386         return taskQueue.size();
387     }
388 
389     /**
390      * Add a task to the task queue, or throws a {@link RejectedExecutionException} if this instance was shutdown
391      * before.
392      */
393     protected void addTask(Runnable task) {
394         ObjectUtil.checkNotNull(task, "task");
395         if (!offerTask(task)) {
396             reject(task);
397         }
398     }
399 
400     final boolean offerTask(Runnable task) {
401         if (isShutdown()) {
402             reject();
403         }
404         return taskQueue.offer(task);
405     }
406 
407     /**
408      * @see Queue#remove(Object)
409      */
410     protected boolean removeTask(Runnable task) {
411         return taskQueue.remove(ObjectUtil.checkNotNull(task, "task"));
412     }
413 
414     /**
415      * Poll all tasks from the task queue and run them via {@link Runnable#run()} method.
416      *
417      * @return {@code true} if and only if at least one task was run
418      */
419     protected boolean runAllTasks() {
420         assert inEventLoop();
421         boolean fetchedAll;
422         boolean ranAtLeastOne = false;
423 
424         do {
425             fetchedAll = fetchFromScheduledTaskQueue();
426             if (runAllTasksFrom(taskQueue)) {
427                 ranAtLeastOne = true;
428             }
429         } while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.
430 
431         if (ranAtLeastOne) {
432             lastExecutionTime = getCurrentTimeNanos();
433         }
434         afterRunningAllTasks();
435         return ranAtLeastOne;
436     }
437 
438     /**
439      * Execute all expired scheduled tasks and all current tasks in the executor queue until both queues are empty,
440      * or {@code maxDrainAttempts} has been exceeded.
441      * @param maxDrainAttempts The maximum amount of times this method attempts to drain from queues. This is to prevent
442      *                         continuous task execution and scheduling from preventing the EventExecutor thread to
443      *                         make progress and return to the selector mechanism to process inbound I/O events.
444      * @return {@code true} if at least one task was run.
445      */
446     protected final boolean runScheduledAndExecutorTasks(final int maxDrainAttempts) {
447         assert inEventLoop();
448         boolean ranAtLeastOneTask;
449         int drainAttempt = 0;
450         do {
451             // We must run the taskQueue tasks first, because the scheduled tasks from outside the EventLoop are queued
452             // here because the taskQueue is thread safe and the scheduledTaskQueue is not thread safe.
453             ranAtLeastOneTask = runExistingTasksFrom(taskQueue) | executeExpiredScheduledTasks();
454         } while (ranAtLeastOneTask && ++drainAttempt < maxDrainAttempts);
455 
456         if (drainAttempt > 0) {
457             lastExecutionTime = getCurrentTimeNanos();
458         }
459         afterRunningAllTasks();
460 
461         return drainAttempt > 0;
462     }
463 
464     /**
465      * Runs all tasks from the passed {@code taskQueue}.
466      *
467      * @param taskQueue To poll and execute all tasks.
468      *
469      * @return {@code true} if at least one task was executed.
470      */
471     protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) {
472         Runnable task = pollTaskFrom(taskQueue);
473         if (task == null) {
474             return false;
475         }
476         for (;;) {
477             safeExecute(task);
478             task = pollTaskFrom(taskQueue);
479             if (task == null) {
480                 return true;
481             }
482         }
483     }
484 
485     /**
486      * What ever tasks are present in {@code taskQueue} when this method is invoked will be {@link Runnable#run()}.
487      * @param taskQueue the task queue to drain.
488      * @return {@code true} if at least {@link Runnable#run()} was called.
489      */
490     private boolean runExistingTasksFrom(Queue<Runnable> taskQueue) {
491         Runnable task = pollTaskFrom(taskQueue);
492         if (task == null) {
493             return false;
494         }
495         int remaining = Math.min(maxPendingTasks, taskQueue.size());
496         safeExecute(task);
497         // Use taskQueue.poll() directly rather than pollTaskFrom() since the latter may
498         // silently consume more than one item from the queue (skips over WAKEUP_TASK instances)
499         while (remaining-- > 0 && (task = taskQueue.poll()) != null) {
500             safeExecute(task);
501         }
502         return true;
503     }
504 
505     /**
506      * Poll all tasks from the task queue and run them via {@link Runnable#run()} method.  This method stops running
507      * the tasks in the task queue and returns if it ran longer than {@code timeoutNanos}.
508      */
509     protected boolean runAllTasks(long timeoutNanos) {
510         fetchFromScheduledTaskQueue();
511         Runnable task = pollTask();
512         if (task == null) {
513             afterRunningAllTasks();
514             return false;
515         }
516 
517         final long deadline = timeoutNanos > 0 ? getCurrentTimeNanos() + timeoutNanos : 0;
518         long runTasks = 0;
519         long lastExecutionTime;
520         for (;;) {
521             safeExecute(task);
522 
523             runTasks ++;
524 
525             // Check timeout every 64 tasks because nanoTime() is relatively expensive.
526             // XXX: Hard-coded value - will make it configurable if it is really a problem.
527             if ((runTasks & 0x3F) == 0) {
528                 lastExecutionTime = getCurrentTimeNanos();
529                 if (lastExecutionTime >= deadline) {
530                     break;
531                 }
532             }
533 
534             task = pollTask();
535             if (task == null) {
536                 lastExecutionTime = getCurrentTimeNanos();
537                 break;
538             }
539         }
540 
541         afterRunningAllTasks();
542         this.lastExecutionTime = lastExecutionTime;
543         return true;
544     }
545 
546     /**
547      * Invoked before returning from {@link #runAllTasks()} and {@link #runAllTasks(long)}.
548      */
549     protected void afterRunningAllTasks() { }
550 
551     /**
552      * Returns the amount of time left until the scheduled task with the closest dead line is executed.
553      */
554     protected long delayNanos(long currentTimeNanos) {
555         currentTimeNanos -= initialNanoTime();
556 
557         ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
558         if (scheduledTask == null) {
559             return SCHEDULE_PURGE_INTERVAL;
560         }
561 
562         return scheduledTask.delayNanos(currentTimeNanos);
563     }
564 
565     /**
566      * Returns the absolute point in time (relative to {@link #getCurrentTimeNanos()}) at which the next
567      * closest scheduled task should run.
568      */
569     protected long deadlineNanos() {
570         ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
571         if (scheduledTask == null) {
572             return getCurrentTimeNanos() + SCHEDULE_PURGE_INTERVAL;
573         }
574         return scheduledTask.deadlineNanos();
575     }
576 
577     /**
578      * Updates the internal timestamp that tells when a submitted task was executed most recently.
579      * {@link #runAllTasks()} and {@link #runAllTasks(long)} updates this timestamp automatically, and thus there's
580      * usually no need to call this method.  However, if you take the tasks manually using {@link #takeTask()} or
581      * {@link #pollTask()}, you have to call this method at the end of task execution loop for accurate quiet period
582      * checks.
583      */
584     protected void updateLastExecutionTime() {
585         lastExecutionTime = getCurrentTimeNanos();
586     }
587 
588     /**
589      * Run the tasks in the {@link #taskQueue}
590      */
591     protected abstract void run();
592 
593     /**
594      * Do nothing, sub-classes may override
595      */
596     protected void cleanup() {
597         // NOOP
598     }
599 
600     protected void wakeup(boolean inEventLoop) {
601         if (!inEventLoop) {
602             // Use offer as we actually only need this to unblock the thread and if offer fails we do not care as there
603             // is already something in the queue.
604             taskQueue.offer(WAKEUP_TASK);
605         }
606     }
607 
608     @Override
609     public boolean inEventLoop(Thread thread) {
610         return thread == this.thread;
611     }
612 
613     /**
614      * Add a {@link Runnable} which will be executed on shutdown of this instance
615      */
616     public void addShutdownHook(final Runnable task) {
617         if (inEventLoop()) {
618             shutdownHooks.add(task);
619         } else {
620             execute(new Runnable() {
621                 @Override
622                 public void run() {
623                     shutdownHooks.add(task);
624                 }
625             });
626         }
627     }
628 
629     /**
630      * Remove a previous added {@link Runnable} as a shutdown hook
631      */
632     public void removeShutdownHook(final Runnable task) {
633         if (inEventLoop()) {
634             shutdownHooks.remove(task);
635         } else {
636             execute(new Runnable() {
637                 @Override
638                 public void run() {
639                     shutdownHooks.remove(task);
640                 }
641             });
642         }
643     }
644 
645     private boolean runShutdownHooks() {
646         boolean ran = false;
647         // Note shutdown hooks can add / remove shutdown hooks.
648         while (!shutdownHooks.isEmpty()) {
649             List<Runnable> copy = new ArrayList<Runnable>(shutdownHooks);
650             shutdownHooks.clear();
651             for (Runnable task: copy) {
652                 try {
653                     runTask(task);
654                 } catch (Throwable t) {
655                     logger.warn("Shutdown hook raised an exception.", t);
656                 } finally {
657                     ran = true;
658                 }
659             }
660         }
661 
662         if (ran) {
663             lastExecutionTime = getCurrentTimeNanos();
664         }
665 
666         return ran;
667     }
668 
669     @Override
670     public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
671         ObjectUtil.checkPositiveOrZero(quietPeriod, "quietPeriod");
672         if (timeout < quietPeriod) {
673             throw new IllegalArgumentException(
674                     "timeout: " + timeout + " (expected >= quietPeriod (" + quietPeriod + "))");
675         }
676         ObjectUtil.checkNotNull(unit, "unit");
677 
678         if (isShuttingDown()) {
679             return terminationFuture();
680         }
681 
682         boolean inEventLoop = inEventLoop();
683         boolean wakeup;
684         int oldState;
685         for (;;) {
686             if (isShuttingDown()) {
687                 return terminationFuture();
688             }
689             int newState;
690             wakeup = true;
691             oldState = state;
692             if (inEventLoop) {
693                 newState = ST_SHUTTING_DOWN;
694             } else {
695                 switch (oldState) {
696                     case ST_NOT_STARTED:
697                     case ST_STARTED:
698                     case ST_SUSPENDING:
699                     case ST_SUSPENDED:
700                         newState = ST_SHUTTING_DOWN;
701                         break;
702                     default:
703                         newState = oldState;
704                         wakeup = false;
705                 }
706             }
707             if (STATE_UPDATER.compareAndSet(this, oldState, newState)) {
708                 break;
709             }
710         }
711         gracefulShutdownQuietPeriod = unit.toNanos(quietPeriod);
712         gracefulShutdownTimeout = unit.toNanos(timeout);
713 
714         if (ensureThreadStarted(oldState)) {
715             return terminationFuture;
716         }
717 
718         if (wakeup) {
719             taskQueue.offer(WAKEUP_TASK);
720             if (!addTaskWakesUp) {
721                 wakeup(inEventLoop);
722             }
723         }
724 
725         return terminationFuture();
726     }
727 
728     @Override
729     public Future<?> terminationFuture() {
730         return terminationFuture;
731     }
732 
733     @Override
734     @Deprecated
735     public void shutdown() {
736         if (isShutdown()) {
737             return;
738         }
739 
740         boolean inEventLoop = inEventLoop();
741         boolean wakeup;
742         int oldState;
743         for (;;) {
744             if (isShuttingDown()) {
745                 return;
746             }
747             int newState;
748             wakeup = true;
749             oldState = state;
750             if (inEventLoop) {
751                 newState = ST_SHUTDOWN;
752             } else {
753                 switch (oldState) {
754                     case ST_NOT_STARTED:
755                     case ST_STARTED:
756                     case ST_SUSPENDING:
757                     case ST_SUSPENDED:
758                     case ST_SHUTTING_DOWN:
759                         newState = ST_SHUTDOWN;
760                         break;
761                     default:
762                         newState = oldState;
763                         wakeup = false;
764                 }
765             }
766             if (STATE_UPDATER.compareAndSet(this, oldState, newState)) {
767                 break;
768             }
769         }
770 
771         if (ensureThreadStarted(oldState)) {
772             return;
773         }
774 
775         if (wakeup) {
776             taskQueue.offer(WAKEUP_TASK);
777             if (!addTaskWakesUp) {
778                 wakeup(inEventLoop);
779             }
780         }
781     }
782 
783     @Override
784     public boolean isShuttingDown() {
785         return state >= ST_SHUTTING_DOWN;
786     }
787 
788     @Override
789     public boolean isShutdown() {
790         return state >= ST_SHUTDOWN;
791     }
792 
793     @Override
794     public boolean isTerminated() {
795         return state == ST_TERMINATED;
796     }
797 
798     @Override
799     public boolean isSuspended() {
800         int currentState = state;
801         return currentState == ST_SUSPENDED || currentState == ST_SUSPENDING;
802     }
803 
804     @Override
805     public boolean trySuspend() {
806         if (supportSuspension) {
807             if (STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_SUSPENDING)) {
808                 wakeup(inEventLoop());
809                 return true;
810             }
811             int currentState = state;
812             return currentState == ST_SUSPENDED || currentState == ST_SUSPENDING;
813         }
814         return false;
815     }
816 
817     /**
818      * Returns {@code true} if this {@link SingleThreadEventExecutor} can be suspended at the moment, {@code false}
819      * otherwise.
820      *
821      * @return  if suspension is possible at the moment.
822      */
823     protected boolean canSuspend() {
824         return canSuspend(state);
825     }
826 
827     /**
828      * Returns {@code true} if this {@link SingleThreadEventExecutor} can be suspended at the moment, {@code false}
829      * otherwise.
830      *
831      * Subclasses might override this method to add extra checks.
832      *
833      * @param   state   the current internal state of the {@link SingleThreadEventExecutor}.
834      * @return          if suspension is possible at the moment.
835      */
836     protected boolean canSuspend(int state) {
837         assert inEventLoop();
838         return supportSuspension && (state == ST_SUSPENDED || state == ST_SUSPENDING)
839                 && !hasTasks() && nextScheduledTaskDeadlineNanos() == -1;
840     }
841 
842     /**
843      * Confirm that the shutdown if the instance should be done now!
844      */
845     protected boolean confirmShutdown() {
846         if (!isShuttingDown()) {
847             return false;
848         }
849 
850         if (!inEventLoop()) {
851             throw new IllegalStateException("must be invoked from an event loop");
852         }
853 
854         cancelScheduledTasks();
855 
856         if (gracefulShutdownStartTime == 0) {
857             gracefulShutdownStartTime = getCurrentTimeNanos();
858         }
859 
860         if (runAllTasks() || runShutdownHooks()) {
861             if (isShutdown()) {
862                 // Executor shut down - no new tasks anymore.
863                 return true;
864             }
865 
866             // There were tasks in the queue. Wait a little bit more until no tasks are queued for the quiet period or
867             // terminate if the quiet period is 0.
868             // See https://github.com/netty/netty/issues/4241
869             if (gracefulShutdownQuietPeriod == 0) {
870                 return true;
871             }
872             taskQueue.offer(WAKEUP_TASK);
873             return false;
874         }
875 
876         final long nanoTime = getCurrentTimeNanos();
877 
878         if (isShutdown() || nanoTime - gracefulShutdownStartTime > gracefulShutdownTimeout) {
879             return true;
880         }
881 
882         if (nanoTime - lastExecutionTime <= gracefulShutdownQuietPeriod) {
883             // Check if any tasks were added to the queue every 100ms.
884             // TODO: Change the behavior of takeTask() so that it returns on timeout.
885             taskQueue.offer(WAKEUP_TASK);
886             try {
887                 Thread.sleep(100);
888             } catch (InterruptedException e) {
889                 // Ignore
890             }
891 
892             return false;
893         }
894 
895         // No tasks were added for last quiet period - hopefully safe to shut down.
896         // (Hopefully because we really cannot make a guarantee that there will be no execute() calls by a user.)
897         return true;
898     }
899 
900     @Override
901     public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
902         ObjectUtil.checkNotNull(unit, "unit");
903         if (inEventLoop()) {
904             throw new IllegalStateException("cannot await termination of the current thread");
905         }
906 
907         threadLock.await(timeout, unit);
908 
909         return isTerminated();
910     }
911 
912     @Override
913     public void execute(Runnable task) {
914         execute0(task);
915     }
916 
917     @Override
918     public void lazyExecute(Runnable task) {
919         lazyExecute0(task);
920     }
921 
922     private void execute0(@Schedule Runnable task) {
923         ObjectUtil.checkNotNull(task, "task");
924         execute(task, wakesUpForTask(task));
925     }
926 
927     private void lazyExecute0(@Schedule Runnable task) {
928         execute(ObjectUtil.checkNotNull(task, "task"), false);
929     }
930 
931     @Override
932     void scheduleRemoveScheduled(final ScheduledFutureTask<?> task) {
933         ObjectUtil.checkNotNull(task, "task");
934         int currentState = state;
935         if (supportSuspension && currentState == ST_SUSPENDED) {
936             // In the case of scheduling for removal we need to also ensure we will recover the "suspend" state
937             // after it if it was set before. Otherwise we will always end up "unsuspending" things on cancellation
938             // which is not optimal.
939             execute(new Runnable() {
940                 @Override
941                 public void run() {
942                     task.run();
943                     if (canSuspend(ST_SUSPENDED)) {
944                         // Try suspending again to recover the state before we submitted the new task that will
945                         // handle cancellation itself.
946                         trySuspend();
947                     }
948                 }
949             }, true);
950         } else {
951             // task will remove itself from scheduled task queue when it runs
952             execute(task, false);
953         }
954     }
955 
956     private void execute(Runnable task, boolean immediate) {
957         boolean inEventLoop = inEventLoop();
958         addTask(task);
959         if (!inEventLoop) {
960             startThread();
961             if (isShutdown()) {
962                 boolean reject = false;
963                 try {
964                     if (removeTask(task)) {
965                         reject = true;
966                     }
967                 } catch (UnsupportedOperationException e) {
968                     // The task queue does not support removal so the best thing we can do is to just move on and
969                     // hope we will be able to pick-up the task before its completely terminated.
970                     // In worst case we will log on termination.
971                 }
972                 if (reject) {
973                     reject();
974                 }
975             }
976         }
977 
978         if (!addTaskWakesUp && immediate) {
979             wakeup(inEventLoop);
980         }
981     }
982 
983     @Override
984     public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
985         throwIfInEventLoop("invokeAny");
986         return super.invokeAny(tasks);
987     }
988 
989     @Override
990     public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
991             throws InterruptedException, ExecutionException, TimeoutException {
992         throwIfInEventLoop("invokeAny");
993         return super.invokeAny(tasks, timeout, unit);
994     }
995 
996     @Override
997     public <T> List<java.util.concurrent.Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
998             throws InterruptedException {
999         throwIfInEventLoop("invokeAll");
1000         return super.invokeAll(tasks);
1001     }
1002 
1003     @Override
1004     public <T> List<java.util.concurrent.Future<T>> invokeAll(
1005             Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
1006         throwIfInEventLoop("invokeAll");
1007         return super.invokeAll(tasks, timeout, unit);
1008     }
1009 
1010     private void throwIfInEventLoop(String method) {
1011         if (inEventLoop()) {
1012             throw new RejectedExecutionException("Calling " + method + " from within the EventLoop is not allowed");
1013         }
1014     }
1015 
1016     /**
1017      * Returns the {@link ThreadProperties} of the {@link Thread} that powers the {@link SingleThreadEventExecutor}.
1018      * If the {@link SingleThreadEventExecutor} is not started yet, this operation will start it and block until
1019      * it is fully started.
1020      */
1021     public final ThreadProperties threadProperties() {
1022         ThreadProperties threadProperties = this.threadProperties;
1023         if (threadProperties == null) {
1024             Thread thread = this.thread;
1025             if (thread == null) {
1026                 assert !inEventLoop();
1027                 submit(NOOP_TASK).syncUninterruptibly();
1028                 thread = this.thread;
1029                 assert thread != null;
1030             }
1031 
1032             threadProperties = new DefaultThreadProperties(thread);
1033             if (!PROPERTIES_UPDATER.compareAndSet(this, null, threadProperties)) {
1034                 threadProperties = this.threadProperties;
1035             }
1036         }
1037 
1038         return threadProperties;
1039     }
1040 
1041     /**
1042      * @deprecated override {@link SingleThreadEventExecutor#wakesUpForTask} to re-create this behaviour
1043      */
1044     @Deprecated
1045     protected interface NonWakeupRunnable extends LazyRunnable { }
1046 
1047     /**
1048      * Can be overridden to control which tasks require waking the {@link EventExecutor} thread
1049      * if it is waiting so that they can be run immediately.
1050      */
1051     protected boolean wakesUpForTask(Runnable task) {
1052         return true;
1053     }
1054 
1055     protected static void reject() {
1056         throw new RejectedExecutionException("event executor terminated");
1057     }
1058 
1059     /**
1060      * Offers the task to the associated {@link RejectedExecutionHandler}.
1061      *
1062      * @param task to reject.
1063      */
1064     protected final void reject(Runnable task) {
1065         rejectedExecutionHandler.rejected(task, this);
1066     }
1067 
1068     // ScheduledExecutorService implementation
1069 
1070     private static final long SCHEDULE_PURGE_INTERVAL = TimeUnit.SECONDS.toNanos(1);
1071 
1072     private void startThread() {
1073         int currentState = state;
1074         if (currentState == ST_NOT_STARTED || currentState == ST_SUSPENDED) {
1075             if (STATE_UPDATER.compareAndSet(this, currentState, ST_STARTED)) {
1076                 boolean success = false;
1077                 try {
1078                     doStartThread();
1079                     success = true;
1080                 } finally {
1081                     if (!success) {
1082                         STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
1083                     }
1084                 }
1085             }
1086         }
1087     }
1088 
1089     private boolean ensureThreadStarted(int oldState) {
1090         if (oldState == ST_NOT_STARTED || oldState == ST_SUSPENDED) {
1091             try {
1092                 doStartThread();
1093             } catch (Throwable cause) {
1094                 STATE_UPDATER.set(this, ST_TERMINATED);
1095                 terminationFuture.tryFailure(cause);
1096 
1097                 if (!(cause instanceof Exception)) {
1098                     // Also rethrow as it may be an OOME for example
1099                     PlatformDependent.throwException(cause);
1100                 }
1101                 return true;
1102             }
1103         }
1104         return false;
1105     }
1106 
1107     private void doStartThread() {
1108         executor.execute(new Runnable() {
1109             @Override
1110             public void run() {
1111                 processingLock.lock();
1112                 assert thread == null;
1113                 thread = Thread.currentThread();
1114                 if (interrupted) {
1115                     thread.interrupt();
1116                     interrupted = false;
1117                 }
1118                 boolean success = false;
1119                 updateLastExecutionTime();
1120                 boolean suspend = false;
1121                 try {
1122                     for (;;) {
1123                         SingleThreadEventExecutor.this.run();
1124                         success = true;
1125 
1126                         int currentState = state;
1127                         if (canSuspend(currentState)) {
1128                             if (!STATE_UPDATER.compareAndSet(SingleThreadEventExecutor.this,
1129                                     ST_SUSPENDING, ST_SUSPENDED)) {
1130                                 // Try again as the CAS failed.
1131                                 continue;
1132                             }
1133 
1134                             if (!canSuspend(ST_SUSPENDED) && STATE_UPDATER.compareAndSet(SingleThreadEventExecutor.this,
1135                                         ST_SUSPENDED, ST_STARTED)) {
1136                                 // Seems like there was something added to the task queue again in the meantime but we
1137                                 // were able to re-engage this thread as the event loop thread.
1138                                 continue;
1139                             }
1140                             suspend = true;
1141                         }
1142                         break;
1143                     }
1144                 } catch (Throwable t) {
1145                     logger.warn("Unexpected exception from an event executor: ", t);
1146                 } finally {
1147                     boolean shutdown = !suspend;
1148                     if (shutdown) {
1149                         for (;;) {
1150                             // We are re-fetching the state as it might have been shutdown in the meantime.
1151                             int oldState = state;
1152                             if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
1153                                     SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
1154                                 break;
1155                             }
1156                         }
1157                         if (success && gracefulShutdownStartTime == 0) {
1158                             // Check if confirmShutdown() was called at the end of the loop.
1159                             if (logger.isErrorEnabled()) {
1160                                 logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
1161                                         SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must " +
1162                                         "be called before run() implementation terminates.");
1163                             }
1164                         }
1165                     }
1166 
1167                     try {
1168                         if (shutdown) {
1169                             // Run all remaining tasks and shutdown hooks. At this point the event loop
1170                             // is in ST_SHUTTING_DOWN state still accepting tasks which is needed for
1171                             // graceful shutdown with quietPeriod.
1172                             for (;;) {
1173                                 if (confirmShutdown()) {
1174                                     break;
1175                                 }
1176                             }
1177 
1178                             // Now we want to make sure no more tasks can be added from this point. This is
1179                             // achieved by switching the state. Any new tasks beyond this point will be rejected.
1180                             for (;;) {
1181                                 int currentState = state;
1182                                 if (currentState >= ST_SHUTDOWN || STATE_UPDATER.compareAndSet(
1183                                         SingleThreadEventExecutor.this, currentState, ST_SHUTDOWN)) {
1184                                     break;
1185                                 }
1186                             }
1187 
1188                             // We have the final set of tasks in the queue now, no more can be added, run all remaining.
1189                             // No need to loop here, this is the final pass.
1190                             confirmShutdown();
1191                         }
1192                     } finally {
1193                         try {
1194                             if (shutdown) {
1195                                 try {
1196                                     cleanup();
1197                                 } finally {
1198                                     // Lets remove all FastThreadLocals for the Thread as we are about to terminate and
1199                                     // notify the future. The user may block on the future and once it unblocks the JVM
1200                                     // may terminate and start unloading classes.
1201                                     // See https://github.com/netty/netty/issues/6596.
1202                                     FastThreadLocal.removeAll();
1203 
1204                                     STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
1205                                     threadLock.countDown();
1206                                     int numUserTasks = drainTasks();
1207                                     if (numUserTasks > 0 && logger.isWarnEnabled()) {
1208                                         logger.warn("An event executor terminated with " +
1209                                                 "non-empty task queue (" + numUserTasks + ')');
1210                                     }
1211                                     terminationFuture.setSuccess(null);
1212                                 }
1213                             } else {
1214                                 // Lets remove all FastThreadLocals for the Thread as we are about to terminate it.
1215                                 FastThreadLocal.removeAll();
1216 
1217                                 // Reset the stored threadProperties in case of suspension.
1218                                 threadProperties = null;
1219                             }
1220                         } finally {
1221                             thread = null;
1222                             // Let the next thread take over if needed.
1223                             processingLock.unlock();
1224                         }
1225                     }
1226                 }
1227             }
1228         });
1229     }
1230 
1231     final int drainTasks() {
1232         int numTasks = 0;
1233         for (;;) {
1234             Runnable runnable = taskQueue.poll();
1235             if (runnable == null) {
1236                 break;
1237             }
1238             // WAKEUP_TASK should be just discarded as these are added internally.
1239             // The important bit is that we not have any user tasks left.
1240             if (WAKEUP_TASK != runnable) {
1241                 numTasks++;
1242             }
1243         }
1244         return numTasks;
1245     }
1246 
1247     private static final class DefaultThreadProperties implements ThreadProperties {
1248         private final Thread t;
1249 
1250         DefaultThreadProperties(Thread t) {
1251             this.t = t;
1252         }
1253 
1254         @Override
1255         public State state() {
1256             return t.getState();
1257         }
1258 
1259         @Override
1260         public int priority() {
1261             return t.getPriority();
1262         }
1263 
1264         @Override
1265         public boolean isInterrupted() {
1266             return t.isInterrupted();
1267         }
1268 
1269         @Override
1270         public boolean isDaemon() {
1271             return t.isDaemon();
1272         }
1273 
1274         @Override
1275         public String name() {
1276             return t.getName();
1277         }
1278 
1279         @Override
1280         public long id() {
1281             return t.getId();
1282         }
1283 
1284         @Override
1285         public StackTraceElement[] stackTrace() {
1286             return t.getStackTrace();
1287         }
1288 
1289         @Override
1290         public boolean isAlive() {
1291             return t.isAlive();
1292         }
1293     }
1294 }