View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.util.concurrent;
17  
18  import io.netty.util.internal.ObjectUtil;
19  import io.netty.util.internal.PlatformDependent;
20  import io.netty.util.internal.SystemPropertyUtil;
21  import io.netty.util.internal.ThreadExecutorMap;
22  import io.netty.util.internal.logging.InternalLogger;
23  import io.netty.util.internal.logging.InternalLoggerFactory;
24  import org.jetbrains.annotations.Async.Schedule;
25  
26  import java.lang.Thread.State;
27  import java.util.ArrayList;
28  import java.util.Collection;
29  import java.util.LinkedHashSet;
30  import java.util.List;
31  import java.util.Queue;
32  import java.util.Set;
33  import java.util.concurrent.BlockingQueue;
34  import java.util.concurrent.Callable;
35  import java.util.concurrent.CountDownLatch;
36  import java.util.concurrent.ExecutionException;
37  import java.util.concurrent.Executor;
38  import java.util.concurrent.LinkedBlockingQueue;
39  import java.util.concurrent.RejectedExecutionException;
40  import java.util.concurrent.ThreadFactory;
41  import java.util.concurrent.TimeUnit;
42  import java.util.concurrent.TimeoutException;
43  import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
44  import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
45  import java.util.concurrent.locks.Lock;
46  import java.util.concurrent.locks.ReentrantLock;
47  
48  /**
49   * Abstract base class for {@link OrderedEventExecutor}'s that execute all its submitted tasks in a single thread.
50   *
51   */
52  public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
53  
54      static final int DEFAULT_MAX_PENDING_EXECUTOR_TASKS = Math.max(16,
55              SystemPropertyUtil.getInt("io.netty.eventexecutor.maxPendingTasks", Integer.MAX_VALUE));
56  
57      private static final InternalLogger logger =
58              InternalLoggerFactory.getInstance(SingleThreadEventExecutor.class);
59  
60      private static final int ST_NOT_STARTED = 1;
61      private static final int ST_SUSPENDING = 2;
62      private static final int ST_SUSPENDED = 3;
63      private static final int ST_STARTED = 4;
64      private static final int ST_SHUTTING_DOWN = 5;
65      private static final int ST_SHUTDOWN = 6;
66      private static final int ST_TERMINATED = 7;
67  
68      private static final Runnable NOOP_TASK = new Runnable() {
69          @Override
70          public void run() {
71              // Do nothing.
72          }
73      };
74  
75      private static final AtomicIntegerFieldUpdater<SingleThreadEventExecutor> STATE_UPDATER =
76              AtomicIntegerFieldUpdater.newUpdater(SingleThreadEventExecutor.class, "state");
77      private static final AtomicReferenceFieldUpdater<SingleThreadEventExecutor, ThreadProperties> PROPERTIES_UPDATER =
78              AtomicReferenceFieldUpdater.newUpdater(
79                      SingleThreadEventExecutor.class, ThreadProperties.class, "threadProperties");
80      private final Queue<Runnable> taskQueue;
81  
82      private volatile Thread thread;
83      @SuppressWarnings("unused")
84      private volatile ThreadProperties threadProperties;
85      private final Executor executor;
86      private volatile boolean interrupted;
87  
88      private final Lock processingLock = new ReentrantLock();
89      private final CountDownLatch threadLock = new CountDownLatch(1);
90      private final Set<Runnable> shutdownHooks = new LinkedHashSet<Runnable>();
91      private final boolean addTaskWakesUp;
92      private final int maxPendingTasks;
93      private final RejectedExecutionHandler rejectedExecutionHandler;
94      private final boolean supportSuspension;
95  
96      private long lastExecutionTime;
97  
98      @SuppressWarnings({ "FieldMayBeFinal", "unused" })
99      private volatile int state = ST_NOT_STARTED;
100 
101     private volatile long gracefulShutdownQuietPeriod;
102     private volatile long gracefulShutdownTimeout;
103     private long gracefulShutdownStartTime;
104 
105     private final Promise<?> terminationFuture = new DefaultPromise<Void>(GlobalEventExecutor.INSTANCE);
106 
107     /**
108      * Create a new instance
109      *
110      * @param parent            the {@link EventExecutorGroup} which is the parent of this instance and belongs to it
111      * @param threadFactory     the {@link ThreadFactory} which will be used for the used {@link Thread}
112      * @param addTaskWakesUp    {@code true} if and only if invocation of {@link #addTask(Runnable)} will wake up the
113      *                          executor thread
114      */
115     protected SingleThreadEventExecutor(
116             EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
117         this(parent, new ThreadPerTaskExecutor(threadFactory), addTaskWakesUp);
118     }
119 
120     /**
121      * Create a new instance
122      *
123      * @param parent            the {@link EventExecutorGroup} which is the parent of this instance and belongs to it
124      * @param threadFactory     the {@link ThreadFactory} which will be used for the used {@link Thread}
125      * @param addTaskWakesUp    {@code true} if and only if invocation of {@link #addTask(Runnable)} will wake up the
126      *                          executor thread
127      * @param maxPendingTasks   the maximum number of pending tasks before new tasks will be rejected.
128      * @param rejectedHandler   the {@link RejectedExecutionHandler} to use.
129      */
130     protected SingleThreadEventExecutor(
131             EventExecutorGroup parent, ThreadFactory threadFactory,
132             boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedHandler) {
133         this(parent, new ThreadPerTaskExecutor(threadFactory), addTaskWakesUp, maxPendingTasks, rejectedHandler);
134     }
135 
136     /**
137      * Create a new instance
138      *
139      * @param parent            the {@link EventExecutorGroup} which is the parent of this instance and belongs to it
140      * @param threadFactory     the {@link ThreadFactory} which will be used for the used {@link Thread}
141      * @param addTaskWakesUp    {@code true} if and only if invocation of {@link #addTask(Runnable)} will wake up the
142      *                          executor thread
143      * @param supportSuspension {@code true} if suspension of this {@link SingleThreadEventExecutor} is supported.
144      * @param maxPendingTasks   the maximum number of pending tasks before new tasks will be rejected.
145      * @param rejectedHandler   the {@link RejectedExecutionHandler} to use.
146      */
147     protected SingleThreadEventExecutor(
148             EventExecutorGroup parent, ThreadFactory threadFactory,
149             boolean addTaskWakesUp, boolean supportSuspension,
150             int maxPendingTasks, RejectedExecutionHandler rejectedHandler) {
151         this(parent, new ThreadPerTaskExecutor(threadFactory), addTaskWakesUp, supportSuspension,
152                 maxPendingTasks, rejectedHandler);
153     }
154 
155     /**
156      * Create a new instance
157      *
158      * @param parent            the {@link EventExecutorGroup} which is the parent of this instance and belongs to it
159      * @param executor          the {@link Executor} which will be used for executing
160      * @param addTaskWakesUp    {@code true} if and only if invocation of {@link #addTask(Runnable)} will wake up the
161      *                          executor thread
162      */
163     protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp) {
164         this(parent, executor, addTaskWakesUp, DEFAULT_MAX_PENDING_EXECUTOR_TASKS, RejectedExecutionHandlers.reject());
165     }
166 
167     /**
168      * Create a new instance
169      *
170      * @param parent            the {@link EventExecutorGroup} which is the parent of this instance and belongs to it
171      * @param executor          the {@link Executor} which will be used for executing
172      * @param addTaskWakesUp    {@code true} if and only if invocation of {@link #addTask(Runnable)} will wake up the
173      *                          executor thread
174      * @param maxPendingTasks   the maximum number of pending tasks before new tasks will be rejected.
175      * @param rejectedHandler   the {@link RejectedExecutionHandler} to use.
176      */
177     protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
178                                         boolean addTaskWakesUp, int maxPendingTasks,
179                                         RejectedExecutionHandler rejectedHandler) {
180         this(parent, executor, addTaskWakesUp, false, maxPendingTasks, rejectedHandler);
181     }
182 
183     /**
184      * Create a new instance
185      *
186      * @param parent            the {@link EventExecutorGroup} which is the parent of this instance and belongs to it
187      * @param executor          the {@link Executor} which will be used for executing
188      * @param addTaskWakesUp    {@code true} if and only if invocation of {@link #addTask(Runnable)} will wake up the
189      *                          executor thread
190      * @param supportSuspension {@code true} if suspension of this {@link SingleThreadEventExecutor} is supported.
191      * @param maxPendingTasks   the maximum number of pending tasks before new tasks will be rejected.
192      * @param rejectedHandler   the {@link RejectedExecutionHandler} to use.
193      */
194     protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
195                                         boolean addTaskWakesUp, boolean supportSuspension,
196                                         int maxPendingTasks, RejectedExecutionHandler rejectedHandler) {
197         super(parent);
198         this.addTaskWakesUp = addTaskWakesUp;
199         this.supportSuspension = supportSuspension;
200         this.maxPendingTasks = Math.max(16, maxPendingTasks);
201         this.executor = ThreadExecutorMap.apply(executor, this);
202         taskQueue = newTaskQueue(this.maxPendingTasks);
203         rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
204     }
205 
206     protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
207                                         boolean addTaskWakesUp, Queue<Runnable> taskQueue,
208                                         RejectedExecutionHandler rejectedHandler) {
209         this(parent, executor, addTaskWakesUp, false, taskQueue, rejectedHandler);
210     }
211 
212     protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
213                                         boolean addTaskWakesUp, boolean supportSuspension,
214                                         Queue<Runnable> taskQueue, RejectedExecutionHandler rejectedHandler) {
215         super(parent);
216         this.addTaskWakesUp = addTaskWakesUp;
217         this.supportSuspension = supportSuspension;
218         this.maxPendingTasks = DEFAULT_MAX_PENDING_EXECUTOR_TASKS;
219         this.executor = ThreadExecutorMap.apply(executor, this);
220         this.taskQueue = ObjectUtil.checkNotNull(taskQueue, "taskQueue");
221         this.rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
222     }
223 
224     /**
225      * @deprecated Please use and override {@link #newTaskQueue(int)}.
226      */
227     @Deprecated
228     protected Queue<Runnable> newTaskQueue() {
229         return newTaskQueue(maxPendingTasks);
230     }
231 
232     /**
233      * Create a new {@link Queue} which will holds the tasks to execute. This default implementation will return a
234      * {@link LinkedBlockingQueue} but if your sub-class of {@link SingleThreadEventExecutor} will not do any blocking
235      * calls on the this {@link Queue} it may make sense to {@code @Override} this and return some more performant
236      * implementation that does not support blocking operations at all.
237      */
238     protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
239         return new LinkedBlockingQueue<Runnable>(maxPendingTasks);
240     }
241 
242     /**
243      * Interrupt the current running {@link Thread}.
244      */
245     protected void interruptThread() {
246         Thread currentThread = thread;
247         if (currentThread == null) {
248             interrupted = true;
249         } else {
250             currentThread.interrupt();
251         }
252     }
253 
254     /**
255      * @see Queue#poll()
256      */
257     protected Runnable pollTask() {
258         assert inEventLoop();
259         return pollTaskFrom(taskQueue);
260     }
261 
262     protected static Runnable pollTaskFrom(Queue<Runnable> taskQueue) {
263         for (;;) {
264             Runnable task = taskQueue.poll();
265             if (task != WAKEUP_TASK) {
266                 return task;
267             }
268         }
269     }
270 
271     /**
272      * Take the next {@link Runnable} from the task queue and so will block if no task is currently present.
273      * <p>
274      * Be aware that this method will throw an {@link UnsupportedOperationException} if the task queue, which was
275      * created via {@link #newTaskQueue()}, does not implement {@link BlockingQueue}.
276      * </p>
277      *
278      * @return {@code null} if the executor thread has been interrupted or waken up.
279      */
280     protected Runnable takeTask() {
281         assert inEventLoop();
282         if (!(taskQueue instanceof BlockingQueue)) {
283             throw new UnsupportedOperationException();
284         }
285 
286         BlockingQueue<Runnable> taskQueue = (BlockingQueue<Runnable>) this.taskQueue;
287         for (;;) {
288             ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
289             if (scheduledTask == null) {
290                 Runnable task = null;
291                 try {
292                     task = taskQueue.take();
293                     if (task == WAKEUP_TASK) {
294                         task = null;
295                     }
296                 } catch (InterruptedException e) {
297                     // Ignore
298                 }
299                 return task;
300             } else {
301                 long delayNanos = scheduledTask.delayNanos();
302                 Runnable task = null;
303                 if (delayNanos > 0) {
304                     try {
305                         task = taskQueue.poll(delayNanos, TimeUnit.NANOSECONDS);
306                     } catch (InterruptedException e) {
307                         // Waken up.
308                         return null;
309                     }
310                 }
311                 if (task == null) {
312                     // We need to fetch the scheduled tasks now as otherwise there may be a chance that
313                     // scheduled tasks are never executed if there is always one task in the taskQueue.
314                     // This is for example true for the read task of OIO Transport
315                     // See https://github.com/netty/netty/issues/1614
316                     fetchFromScheduledTaskQueue();
317                     task = taskQueue.poll();
318                 }
319 
320                 if (task != null) {
321                     if (task == WAKEUP_TASK) {
322                         return null;
323                     }
324                     return task;
325                 }
326             }
327         }
328     }
329 
330     private boolean fetchFromScheduledTaskQueue() {
331         return fetchFromScheduledTaskQueue(taskQueue);
332     }
333 
334     /**
335      * @return {@code true} if at least one scheduled task was executed.
336      */
337     private boolean executeExpiredScheduledTasks() {
338         if (scheduledTaskQueue == null || scheduledTaskQueue.isEmpty()) {
339             return false;
340         }
341         long nanoTime = getCurrentTimeNanos();
342         Runnable scheduledTask = pollScheduledTask(nanoTime);
343         if (scheduledTask == null) {
344             return false;
345         }
346         do {
347             safeExecute(scheduledTask);
348         } while ((scheduledTask = pollScheduledTask(nanoTime)) != null);
349         return true;
350     }
351 
352     /**
353      * @see Queue#peek()
354      */
355     protected Runnable peekTask() {
356         assert inEventLoop();
357         return taskQueue.peek();
358     }
359 
360     /**
361      * @see Queue#isEmpty()
362      */
363     protected boolean hasTasks() {
364         assert inEventLoop();
365         return !taskQueue.isEmpty();
366     }
367 
368     /**
369      * Return the number of tasks that are pending for processing.
370      */
371     public int pendingTasks() {
372         return taskQueue.size();
373     }
374 
375     /**
376      * Add a task to the task queue, or throws a {@link RejectedExecutionException} if this instance was shutdown
377      * before.
378      */
379     protected void addTask(Runnable task) {
380         ObjectUtil.checkNotNull(task, "task");
381         if (!offerTask(task)) {
382             reject(task);
383         }
384     }
385 
386     final boolean offerTask(Runnable task) {
387         if (isShutdown()) {
388             reject();
389         }
390         return taskQueue.offer(task);
391     }
392 
393     /**
394      * @see Queue#remove(Object)
395      */
396     protected boolean removeTask(Runnable task) {
397         return taskQueue.remove(ObjectUtil.checkNotNull(task, "task"));
398     }
399 
400     /**
401      * Poll all tasks from the task queue and run them via {@link Runnable#run()} method.
402      *
403      * @return {@code true} if and only if at least one task was run
404      */
405     protected boolean runAllTasks() {
406         assert inEventLoop();
407         boolean fetchedAll;
408         boolean ranAtLeastOne = false;
409 
410         do {
411             fetchedAll = fetchFromScheduledTaskQueue(taskQueue);
412             if (runAllTasksFrom(taskQueue)) {
413                 ranAtLeastOne = true;
414             }
415         } while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.
416 
417         if (ranAtLeastOne) {
418             lastExecutionTime = getCurrentTimeNanos();
419         }
420         afterRunningAllTasks();
421         return ranAtLeastOne;
422     }
423 
424     /**
425      * Execute all expired scheduled tasks and all current tasks in the executor queue until both queues are empty,
426      * or {@code maxDrainAttempts} has been exceeded.
427      * @param maxDrainAttempts The maximum amount of times this method attempts to drain from queues. This is to prevent
428      *                         continuous task execution and scheduling from preventing the EventExecutor thread to
429      *                         make progress and return to the selector mechanism to process inbound I/O events.
430      * @return {@code true} if at least one task was run.
431      */
432     protected final boolean runScheduledAndExecutorTasks(final int maxDrainAttempts) {
433         assert inEventLoop();
434         boolean ranAtLeastOneTask;
435         int drainAttempt = 0;
436         do {
437             // We must run the taskQueue tasks first, because the scheduled tasks from outside the EventLoop are queued
438             // here because the taskQueue is thread safe and the scheduledTaskQueue is not thread safe.
439             ranAtLeastOneTask = runExistingTasksFrom(taskQueue) | executeExpiredScheduledTasks();
440         } while (ranAtLeastOneTask && ++drainAttempt < maxDrainAttempts);
441 
442         if (drainAttempt > 0) {
443             lastExecutionTime = getCurrentTimeNanos();
444         }
445         afterRunningAllTasks();
446 
447         return drainAttempt > 0;
448     }
449 
450     /**
451      * Runs all tasks from the passed {@code taskQueue}.
452      *
453      * @param taskQueue To poll and execute all tasks.
454      *
455      * @return {@code true} if at least one task was executed.
456      */
457     protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) {
458         Runnable task = pollTaskFrom(taskQueue);
459         if (task == null) {
460             return false;
461         }
462         for (;;) {
463             safeExecute(task);
464             task = pollTaskFrom(taskQueue);
465             if (task == null) {
466                 return true;
467             }
468         }
469     }
470 
471     /**
472      * What ever tasks are present in {@code taskQueue} when this method is invoked will be {@link Runnable#run()}.
473      * @param taskQueue the task queue to drain.
474      * @return {@code true} if at least {@link Runnable#run()} was called.
475      */
476     private boolean runExistingTasksFrom(Queue<Runnable> taskQueue) {
477         Runnable task = pollTaskFrom(taskQueue);
478         if (task == null) {
479             return false;
480         }
481         int remaining = Math.min(maxPendingTasks, taskQueue.size());
482         safeExecute(task);
483         // Use taskQueue.poll() directly rather than pollTaskFrom() since the latter may
484         // silently consume more than one item from the queue (skips over WAKEUP_TASK instances)
485         while (remaining-- > 0 && (task = taskQueue.poll()) != null) {
486             safeExecute(task);
487         }
488         return true;
489     }
490 
491     /**
492      * Poll all tasks from the task queue and run them via {@link Runnable#run()} method.  This method stops running
493      * the tasks in the task queue and returns if it ran longer than {@code timeoutNanos}.
494      */
495     protected boolean runAllTasks(long timeoutNanos) {
496         fetchFromScheduledTaskQueue(taskQueue);
497         Runnable task = pollTask();
498         if (task == null) {
499             afterRunningAllTasks();
500             return false;
501         }
502 
503         final long deadline = timeoutNanos > 0 ? getCurrentTimeNanos() + timeoutNanos : 0;
504         long runTasks = 0;
505         long lastExecutionTime;
506         for (;;) {
507             safeExecute(task);
508 
509             runTasks ++;
510 
511             // Check timeout every 64 tasks because nanoTime() is relatively expensive.
512             // XXX: Hard-coded value - will make it configurable if it is really a problem.
513             if ((runTasks & 0x3F) == 0) {
514                 lastExecutionTime = getCurrentTimeNanos();
515                 if (lastExecutionTime >= deadline) {
516                     break;
517                 }
518             }
519 
520             task = pollTask();
521             if (task == null) {
522                 lastExecutionTime = getCurrentTimeNanos();
523                 break;
524             }
525         }
526 
527         afterRunningAllTasks();
528         this.lastExecutionTime = lastExecutionTime;
529         return true;
530     }
531 
532     /**
533      * Invoked before returning from {@link #runAllTasks()} and {@link #runAllTasks(long)}.
534      */
535     protected void afterRunningAllTasks() { }
536 
537     /**
538      * Returns the amount of time left until the scheduled task with the closest dead line is executed.
539      */
540     protected long delayNanos(long currentTimeNanos) {
541         currentTimeNanos -= initialNanoTime();
542 
543         ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
544         if (scheduledTask == null) {
545             return SCHEDULE_PURGE_INTERVAL;
546         }
547 
548         return scheduledTask.delayNanos(currentTimeNanos);
549     }
550 
551     /**
552      * Returns the absolute point in time (relative to {@link #getCurrentTimeNanos()}) at which the next
553      * closest scheduled task should run.
554      */
555     protected long deadlineNanos() {
556         ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
557         if (scheduledTask == null) {
558             return getCurrentTimeNanos() + SCHEDULE_PURGE_INTERVAL;
559         }
560         return scheduledTask.deadlineNanos();
561     }
562 
563     /**
564      * Updates the internal timestamp that tells when a submitted task was executed most recently.
565      * {@link #runAllTasks()} and {@link #runAllTasks(long)} updates this timestamp automatically, and thus there's
566      * usually no need to call this method.  However, if you take the tasks manually using {@link #takeTask()} or
567      * {@link #pollTask()}, you have to call this method at the end of task execution loop for accurate quiet period
568      * checks.
569      */
570     protected void updateLastExecutionTime() {
571         lastExecutionTime = getCurrentTimeNanos();
572     }
573 
574     /**
575      * Run the tasks in the {@link #taskQueue}
576      */
577     protected abstract void run();
578 
579     /**
580      * Do nothing, sub-classes may override
581      */
582     protected void cleanup() {
583         // NOOP
584     }
585 
586     protected void wakeup(boolean inEventLoop) {
587         if (!inEventLoop) {
588             // Use offer as we actually only need this to unblock the thread and if offer fails we do not care as there
589             // is already something in the queue.
590             taskQueue.offer(WAKEUP_TASK);
591         }
592     }
593 
594     @Override
595     public boolean inEventLoop(Thread thread) {
596         return thread == this.thread;
597     }
598 
599     /**
600      * Add a {@link Runnable} which will be executed on shutdown of this instance
601      */
602     public void addShutdownHook(final Runnable task) {
603         if (inEventLoop()) {
604             shutdownHooks.add(task);
605         } else {
606             execute(new Runnable() {
607                 @Override
608                 public void run() {
609                     shutdownHooks.add(task);
610                 }
611             });
612         }
613     }
614 
615     /**
616      * Remove a previous added {@link Runnable} as a shutdown hook
617      */
618     public void removeShutdownHook(final Runnable task) {
619         if (inEventLoop()) {
620             shutdownHooks.remove(task);
621         } else {
622             execute(new Runnable() {
623                 @Override
624                 public void run() {
625                     shutdownHooks.remove(task);
626                 }
627             });
628         }
629     }
630 
631     private boolean runShutdownHooks() {
632         boolean ran = false;
633         // Note shutdown hooks can add / remove shutdown hooks.
634         while (!shutdownHooks.isEmpty()) {
635             List<Runnable> copy = new ArrayList<Runnable>(shutdownHooks);
636             shutdownHooks.clear();
637             for (Runnable task: copy) {
638                 try {
639                     runTask(task);
640                 } catch (Throwable t) {
641                     logger.warn("Shutdown hook raised an exception.", t);
642                 } finally {
643                     ran = true;
644                 }
645             }
646         }
647 
648         if (ran) {
649             lastExecutionTime = getCurrentTimeNanos();
650         }
651 
652         return ran;
653     }
654 
655     private void shutdown0(long quietPeriod, long timeout, int shutdownState) {
656         if (isShuttingDown()) {
657             return;
658         }
659 
660         boolean inEventLoop = inEventLoop();
661         boolean wakeup;
662         int oldState;
663         for (;;) {
664             if (isShuttingDown()) {
665                 return;
666             }
667             int newState;
668             wakeup = true;
669             oldState = state;
670             if (inEventLoop) {
671                 newState = shutdownState;
672             } else {
673                 switch (oldState) {
674                     case ST_NOT_STARTED:
675                     case ST_STARTED:
676                     case ST_SUSPENDING:
677                     case ST_SUSPENDED:
678                         newState = shutdownState;
679                         break;
680                     default:
681                         newState = oldState;
682                         wakeup = false;
683                 }
684             }
685             if (STATE_UPDATER.compareAndSet(this, oldState, newState)) {
686                 break;
687             }
688         }
689         if (quietPeriod != -1) {
690             gracefulShutdownQuietPeriod = quietPeriod;
691         }
692         if (timeout != -1) {
693             gracefulShutdownTimeout = timeout;
694         }
695 
696         if (ensureThreadStarted(oldState)) {
697             return;
698         }
699 
700         if (wakeup) {
701             taskQueue.offer(WAKEUP_TASK);
702             if (!addTaskWakesUp) {
703                 wakeup(inEventLoop);
704             }
705         }
706     }
707 
708     @Override
709     public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
710         ObjectUtil.checkPositiveOrZero(quietPeriod, "quietPeriod");
711         if (timeout < quietPeriod) {
712             throw new IllegalArgumentException(
713                     "timeout: " + timeout + " (expected >= quietPeriod (" + quietPeriod + "))");
714         }
715         ObjectUtil.checkNotNull(unit, "unit");
716 
717         shutdown0(unit.toNanos(quietPeriod), unit.toNanos(timeout), ST_SHUTTING_DOWN);
718         return terminationFuture();
719     }
720 
721     @Override
722     public Future<?> terminationFuture() {
723         return terminationFuture;
724     }
725 
726     @Override
727     @Deprecated
728     public void shutdown() {
729         shutdown0(-1, -1, ST_SHUTDOWN);
730     }
731 
732     @Override
733     public boolean isShuttingDown() {
734         return state >= ST_SHUTTING_DOWN;
735     }
736 
737     @Override
738     public boolean isShutdown() {
739         return state >= ST_SHUTDOWN;
740     }
741 
742     @Override
743     public boolean isTerminated() {
744         return state == ST_TERMINATED;
745     }
746 
747     @Override
748     public boolean isSuspended() {
749         int currentState = state;
750         return currentState == ST_SUSPENDED || currentState == ST_SUSPENDING;
751     }
752 
753     @Override
754     public boolean trySuspend() {
755         if (supportSuspension) {
756             if (STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_SUSPENDING)) {
757                 wakeup(inEventLoop());
758                 return true;
759             }
760             int currentState = state;
761             return currentState == ST_SUSPENDED || currentState == ST_SUSPENDING;
762         }
763         return false;
764     }
765 
766     /**
767      * Returns {@code true} if this {@link SingleThreadEventExecutor} can be suspended at the moment, {@code false}
768      * otherwise.
769      *
770      * @return  if suspension is possible at the moment.
771      */
772     protected boolean canSuspend() {
773         return canSuspend(state);
774     }
775 
776     /**
777      * Returns {@code true} if this {@link SingleThreadEventExecutor} can be suspended at the moment, {@code false}
778      * otherwise.
779      *
780      * Subclasses might override this method to add extra checks.
781      *
782      * @param   state   the current internal state of the {@link SingleThreadEventExecutor}.
783      * @return          if suspension is possible at the moment.
784      */
785     protected boolean canSuspend(int state) {
786         assert inEventLoop();
787         return supportSuspension && (state == ST_SUSPENDED || state == ST_SUSPENDING)
788                 && !hasTasks() && nextScheduledTaskDeadlineNanos() == -1;
789     }
790 
791     /**
792      * Confirm that the shutdown if the instance should be done now!
793      */
794     protected boolean confirmShutdown() {
795         if (!isShuttingDown()) {
796             return false;
797         }
798 
799         if (!inEventLoop()) {
800             throw new IllegalStateException("must be invoked from an event loop");
801         }
802 
803         cancelScheduledTasks();
804 
805         if (gracefulShutdownStartTime == 0) {
806             gracefulShutdownStartTime = getCurrentTimeNanos();
807         }
808 
809         if (runAllTasks() || runShutdownHooks()) {
810             if (isShutdown()) {
811                 // Executor shut down - no new tasks anymore.
812                 return true;
813             }
814 
815             // There were tasks in the queue. Wait a little bit more until no tasks are queued for the quiet period or
816             // terminate if the quiet period is 0.
817             // See https://github.com/netty/netty/issues/4241
818             if (gracefulShutdownQuietPeriod == 0) {
819                 return true;
820             }
821             taskQueue.offer(WAKEUP_TASK);
822             return false;
823         }
824 
825         final long nanoTime = getCurrentTimeNanos();
826 
827         if (isShutdown() || nanoTime - gracefulShutdownStartTime > gracefulShutdownTimeout) {
828             return true;
829         }
830 
831         if (nanoTime - lastExecutionTime <= gracefulShutdownQuietPeriod) {
832             // Check if any tasks were added to the queue every 100ms.
833             // TODO: Change the behavior of takeTask() so that it returns on timeout.
834             taskQueue.offer(WAKEUP_TASK);
835             try {
836                 Thread.sleep(100);
837             } catch (InterruptedException e) {
838                 // Ignore
839             }
840 
841             return false;
842         }
843 
844         // No tasks were added for last quiet period - hopefully safe to shut down.
845         // (Hopefully because we really cannot make a guarantee that there will be no execute() calls by a user.)
846         return true;
847     }
848 
849     @Override
850     public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
851         ObjectUtil.checkNotNull(unit, "unit");
852         if (inEventLoop()) {
853             throw new IllegalStateException("cannot await termination of the current thread");
854         }
855 
856         threadLock.await(timeout, unit);
857 
858         return isTerminated();
859     }
860 
861     @Override
862     public void execute(Runnable task) {
863         execute0(task);
864     }
865 
866     @Override
867     public void lazyExecute(Runnable task) {
868         lazyExecute0(task);
869     }
870 
871     private void execute0(@Schedule Runnable task) {
872         ObjectUtil.checkNotNull(task, "task");
873         execute(task, wakesUpForTask(task));
874     }
875 
876     private void lazyExecute0(@Schedule Runnable task) {
877         execute(ObjectUtil.checkNotNull(task, "task"), false);
878     }
879 
880     @Override
881     void scheduleRemoveScheduled(final ScheduledFutureTask<?> task) {
882         ObjectUtil.checkNotNull(task, "task");
883         int currentState = state;
884         if (supportSuspension && currentState == ST_SUSPENDED) {
885             // In the case of scheduling for removal we need to also ensure we will recover the "suspend" state
886             // after it if it was set before. Otherwise we will always end up "unsuspending" things on cancellation
887             // which is not optimal.
888             execute(new Runnable() {
889                 @Override
890                 public void run() {
891                     task.run();
892                     if (canSuspend(ST_SUSPENDED)) {
893                         // Try suspending again to recover the state before we submitted the new task that will
894                         // handle cancellation itself.
895                         trySuspend();
896                     }
897                 }
898             }, true);
899         } else {
900             // task will remove itself from scheduled task queue when it runs
901             execute(task, false);
902         }
903     }
904 
905     private void execute(Runnable task, boolean immediate) {
906         boolean inEventLoop = inEventLoop();
907         addTask(task);
908         if (!inEventLoop) {
909             startThread();
910             if (isShutdown()) {
911                 boolean reject = false;
912                 try {
913                     if (removeTask(task)) {
914                         reject = true;
915                     }
916                 } catch (UnsupportedOperationException e) {
917                     // The task queue does not support removal so the best thing we can do is to just move on and
918                     // hope we will be able to pick-up the task before its completely terminated.
919                     // In worst case we will log on termination.
920                 }
921                 if (reject) {
922                     reject();
923                 }
924             }
925         }
926 
927         if (!addTaskWakesUp && immediate) {
928             wakeup(inEventLoop);
929         }
930     }
931 
932     @Override
933     public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
934         throwIfInEventLoop("invokeAny");
935         return super.invokeAny(tasks);
936     }
937 
938     @Override
939     public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
940             throws InterruptedException, ExecutionException, TimeoutException {
941         throwIfInEventLoop("invokeAny");
942         return super.invokeAny(tasks, timeout, unit);
943     }
944 
945     @Override
946     public <T> List<java.util.concurrent.Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
947             throws InterruptedException {
948         throwIfInEventLoop("invokeAll");
949         return super.invokeAll(tasks);
950     }
951 
952     @Override
953     public <T> List<java.util.concurrent.Future<T>> invokeAll(
954             Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
955         throwIfInEventLoop("invokeAll");
956         return super.invokeAll(tasks, timeout, unit);
957     }
958 
959     private void throwIfInEventLoop(String method) {
960         if (inEventLoop()) {
961             throw new RejectedExecutionException("Calling " + method + " from within the EventLoop is not allowed");
962         }
963     }
964 
965     /**
966      * Returns the {@link ThreadProperties} of the {@link Thread} that powers the {@link SingleThreadEventExecutor}.
967      * If the {@link SingleThreadEventExecutor} is not started yet, this operation will start it and block until
968      * it is fully started.
969      */
970     public final ThreadProperties threadProperties() {
971         ThreadProperties threadProperties = this.threadProperties;
972         if (threadProperties == null) {
973             Thread thread = this.thread;
974             if (thread == null) {
975                 assert !inEventLoop();
976                 submit(NOOP_TASK).syncUninterruptibly();
977                 thread = this.thread;
978                 assert thread != null;
979             }
980 
981             threadProperties = new DefaultThreadProperties(thread);
982             if (!PROPERTIES_UPDATER.compareAndSet(this, null, threadProperties)) {
983                 threadProperties = this.threadProperties;
984             }
985         }
986 
987         return threadProperties;
988     }
989 
990     /**
991      * @deprecated override {@link SingleThreadEventExecutor#wakesUpForTask} to re-create this behaviour
992      */
993     @Deprecated
994     protected interface NonWakeupRunnable extends LazyRunnable { }
995 
996     /**
997      * Can be overridden to control which tasks require waking the {@link EventExecutor} thread
998      * if it is waiting so that they can be run immediately.
999      */
1000     protected boolean wakesUpForTask(Runnable task) {
1001         return true;
1002     }
1003 
1004     protected static void reject() {
1005         throw new RejectedExecutionException("event executor terminated");
1006     }
1007 
1008     /**
1009      * Offers the task to the associated {@link RejectedExecutionHandler}.
1010      *
1011      * @param task to reject.
1012      */
1013     protected final void reject(Runnable task) {
1014         rejectedExecutionHandler.rejected(task, this);
1015     }
1016 
1017     // ScheduledExecutorService implementation
1018 
1019     private static final long SCHEDULE_PURGE_INTERVAL = TimeUnit.SECONDS.toNanos(1);
1020 
1021     private void startThread() {
1022         int currentState = state;
1023         if (currentState == ST_NOT_STARTED || currentState == ST_SUSPENDED) {
1024             if (STATE_UPDATER.compareAndSet(this, currentState, ST_STARTED)) {
1025                 boolean success = false;
1026                 try {
1027                     doStartThread();
1028                     success = true;
1029                 } finally {
1030                     if (!success) {
1031                         STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
1032                     }
1033                 }
1034             }
1035         }
1036     }
1037 
1038     private boolean ensureThreadStarted(int oldState) {
1039         if (oldState == ST_NOT_STARTED || oldState == ST_SUSPENDED) {
1040             try {
1041                 doStartThread();
1042             } catch (Throwable cause) {
1043                 STATE_UPDATER.set(this, ST_TERMINATED);
1044                 terminationFuture.tryFailure(cause);
1045 
1046                 if (!(cause instanceof Exception)) {
1047                     // Also rethrow as it may be an OOME for example
1048                     PlatformDependent.throwException(cause);
1049                 }
1050                 return true;
1051             }
1052         }
1053         return false;
1054     }
1055 
1056     private void doStartThread() {
1057         executor.execute(new Runnable() {
1058             @Override
1059             public void run() {
1060                 processingLock.lock();
1061                 assert thread == null;
1062                 thread = Thread.currentThread();
1063                 if (interrupted) {
1064                     thread.interrupt();
1065                     interrupted = false;
1066                 }
1067                 boolean success = false;
1068                 Throwable unexpectedException = null;
1069                 updateLastExecutionTime();
1070                 boolean suspend = false;
1071                 try {
1072                     for (;;) {
1073                         SingleThreadEventExecutor.this.run();
1074                         success = true;
1075 
1076                         int currentState = state;
1077                         if (canSuspend(currentState)) {
1078                             if (!STATE_UPDATER.compareAndSet(SingleThreadEventExecutor.this,
1079                                     ST_SUSPENDING, ST_SUSPENDED)) {
1080                                 // Try again as the CAS failed.
1081                                 continue;
1082                             }
1083 
1084                             if (!canSuspend(ST_SUSPENDED) && STATE_UPDATER.compareAndSet(SingleThreadEventExecutor.this,
1085                                         ST_SUSPENDED, ST_STARTED)) {
1086                                 // Seems like there was something added to the task queue again in the meantime but we
1087                                 // were able to re-engage this thread as the event loop thread.
1088                                 continue;
1089                             }
1090                             suspend = true;
1091                         }
1092                         break;
1093                     }
1094                 } catch (Throwable t) {
1095                     unexpectedException = t;
1096                     logger.warn("Unexpected exception from an event executor: ", t);
1097                 } finally {
1098                     boolean shutdown = !suspend;
1099                     if (shutdown) {
1100                         for (;;) {
1101                             // We are re-fetching the state as it might have been shutdown in the meantime.
1102                             int oldState = state;
1103                             if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
1104                                     SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
1105                                 break;
1106                             }
1107                         }
1108                         if (success && gracefulShutdownStartTime == 0) {
1109                             // Check if confirmShutdown() was called at the end of the loop.
1110                             if (logger.isErrorEnabled()) {
1111                                 logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
1112                                         SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must " +
1113                                         "be called before run() implementation terminates.");
1114                             }
1115                         }
1116                     }
1117 
1118                     try {
1119                         if (shutdown) {
1120                             // Run all remaining tasks and shutdown hooks. At this point the event loop
1121                             // is in ST_SHUTTING_DOWN state still accepting tasks which is needed for
1122                             // graceful shutdown with quietPeriod.
1123                             for (;;) {
1124                                 if (confirmShutdown()) {
1125                                     break;
1126                                 }
1127                             }
1128 
1129                             // Now we want to make sure no more tasks can be added from this point. This is
1130                             // achieved by switching the state. Any new tasks beyond this point will be rejected.
1131                             for (;;) {
1132                                 int currentState = state;
1133                                 if (currentState >= ST_SHUTDOWN || STATE_UPDATER.compareAndSet(
1134                                         SingleThreadEventExecutor.this, currentState, ST_SHUTDOWN)) {
1135                                     break;
1136                                 }
1137                             }
1138 
1139                             // We have the final set of tasks in the queue now, no more can be added, run all remaining.
1140                             // No need to loop here, this is the final pass.
1141                             confirmShutdown();
1142                         }
1143                     } finally {
1144                         try {
1145                             if (shutdown) {
1146                                 try {
1147                                     cleanup();
1148                                 } finally {
1149                                     // Lets remove all FastThreadLocals for the Thread as we are about to terminate and
1150                                     // notify the future. The user may block on the future and once it unblocks the JVM
1151                                     // may terminate and start unloading classes.
1152                                     // See https://github.com/netty/netty/issues/6596.
1153                                     FastThreadLocal.removeAll();
1154 
1155                                     STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
1156                                     threadLock.countDown();
1157                                     int numUserTasks = drainTasks();
1158                                     if (numUserTasks > 0 && logger.isWarnEnabled()) {
1159                                         logger.warn("An event executor terminated with " +
1160                                                 "non-empty task queue (" + numUserTasks + ')');
1161                                     }
1162                                     if (unexpectedException == null) {
1163                                         terminationFuture.setSuccess(null);
1164                                     } else {
1165                                         terminationFuture.setFailure(unexpectedException);
1166                                     }
1167                                 }
1168                             } else {
1169                                 // Lets remove all FastThreadLocals for the Thread as we are about to terminate it.
1170                                 FastThreadLocal.removeAll();
1171 
1172                                 // Reset the stored threadProperties in case of suspension.
1173                                 threadProperties = null;
1174                             }
1175                         } finally {
1176                             thread = null;
1177                             // Let the next thread take over if needed.
1178                             processingLock.unlock();
1179                         }
1180                     }
1181                 }
1182             }
1183         });
1184     }
1185 
1186     final int drainTasks() {
1187         int numTasks = 0;
1188         for (;;) {
1189             Runnable runnable = taskQueue.poll();
1190             if (runnable == null) {
1191                 break;
1192             }
1193             // WAKEUP_TASK should be just discarded as these are added internally.
1194             // The important bit is that we not have any user tasks left.
1195             if (WAKEUP_TASK != runnable) {
1196                 numTasks++;
1197             }
1198         }
1199         return numTasks;
1200     }
1201 
1202     private static final class DefaultThreadProperties implements ThreadProperties {
1203         private final Thread t;
1204 
1205         DefaultThreadProperties(Thread t) {
1206             this.t = t;
1207         }
1208 
1209         @Override
1210         public State state() {
1211             return t.getState();
1212         }
1213 
1214         @Override
1215         public int priority() {
1216             return t.getPriority();
1217         }
1218 
1219         @Override
1220         public boolean isInterrupted() {
1221             return t.isInterrupted();
1222         }
1223 
1224         @Override
1225         public boolean isDaemon() {
1226             return t.isDaemon();
1227         }
1228 
1229         @Override
1230         public String name() {
1231             return t.getName();
1232         }
1233 
1234         @Override
1235         public long id() {
1236             return t.getId();
1237         }
1238 
1239         @Override
1240         public StackTraceElement[] stackTrace() {
1241             return t.getStackTrace();
1242         }
1243 
1244         @Override
1245         public boolean isAlive() {
1246             return t.isAlive();
1247         }
1248     }
1249 }