View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.util.concurrent;
17  
18  import io.netty.util.internal.ObjectUtil;
19  import io.netty.util.internal.PlatformDependent;
20  import io.netty.util.internal.SystemPropertyUtil;
21  import io.netty.util.internal.ThreadExecutorMap;
22  import io.netty.util.internal.logging.InternalLogger;
23  import io.netty.util.internal.logging.InternalLoggerFactory;
24  import org.jetbrains.annotations.Async.Schedule;
25  
26  import java.lang.Thread.State;
27  import java.util.ArrayList;
28  import java.util.Collection;
29  import java.util.LinkedHashSet;
30  import java.util.List;
31  import java.util.Queue;
32  import java.util.Set;
33  import java.util.concurrent.BlockingQueue;
34  import java.util.concurrent.Callable;
35  import java.util.concurrent.CountDownLatch;
36  import java.util.concurrent.ExecutionException;
37  import java.util.concurrent.Executor;
38  import java.util.concurrent.LinkedBlockingQueue;
39  import java.util.concurrent.RejectedExecutionException;
40  import java.util.concurrent.ThreadFactory;
41  import java.util.concurrent.TimeUnit;
42  import java.util.concurrent.TimeoutException;
43  import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
44  import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
45  
46  /**
47   * Abstract base class for {@link OrderedEventExecutor}'s that execute all its submitted tasks in a single thread.
48   *
49   */
50  public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
51  
52      static final int DEFAULT_MAX_PENDING_EXECUTOR_TASKS = Math.max(16,
53              SystemPropertyUtil.getInt("io.netty.eventexecutor.maxPendingTasks", Integer.MAX_VALUE));
54  
55      private static final InternalLogger logger =
56              InternalLoggerFactory.getInstance(SingleThreadEventExecutor.class);
57  
58      private static final int ST_NOT_STARTED = 1;
59      private static final int ST_STARTED = 2;
60      private static final int ST_SHUTTING_DOWN = 3;
61      private static final int ST_SHUTDOWN = 4;
62      private static final int ST_TERMINATED = 5;
63  
64      private static final Runnable NOOP_TASK = new Runnable() {
65          @Override
66          public void run() {
67              // Do nothing.
68          }
69      };
70  
71      private static final AtomicIntegerFieldUpdater<SingleThreadEventExecutor> STATE_UPDATER =
72              AtomicIntegerFieldUpdater.newUpdater(SingleThreadEventExecutor.class, "state");
73      private static final AtomicReferenceFieldUpdater<SingleThreadEventExecutor, ThreadProperties> PROPERTIES_UPDATER =
74              AtomicReferenceFieldUpdater.newUpdater(
75                      SingleThreadEventExecutor.class, ThreadProperties.class, "threadProperties");
76  
77      private final Queue<Runnable> taskQueue;
78  
79      private volatile Thread thread;
80      @SuppressWarnings("unused")
81      private volatile ThreadProperties threadProperties;
82      private final Executor executor;
83      private volatile boolean interrupted;
84  
85      private final CountDownLatch threadLock = new CountDownLatch(1);
86      private final Set<Runnable> shutdownHooks = new LinkedHashSet<Runnable>();
87      private final boolean addTaskWakesUp;
88      private final int maxPendingTasks;
89      private final RejectedExecutionHandler rejectedExecutionHandler;
90  
91      private long lastExecutionTime;
92  
93      @SuppressWarnings({ "FieldMayBeFinal", "unused" })
94      private volatile int state = ST_NOT_STARTED;
95  
96      private volatile long gracefulShutdownQuietPeriod;
97      private volatile long gracefulShutdownTimeout;
98      private long gracefulShutdownStartTime;
99  
100     private final Promise<?> terminationFuture = new DefaultPromise<Void>(GlobalEventExecutor.INSTANCE);
101 
102     /**
103      * Create a new instance
104      *
105      * @param parent            the {@link EventExecutorGroup} which is the parent of this instance and belongs to it
106      * @param threadFactory     the {@link ThreadFactory} which will be used for the used {@link Thread}
107      * @param addTaskWakesUp    {@code true} if and only if invocation of {@link #addTask(Runnable)} will wake up the
108      *                          executor thread
109      */
110     protected SingleThreadEventExecutor(
111             EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
112         this(parent, new ThreadPerTaskExecutor(threadFactory), addTaskWakesUp);
113     }
114 
115     /**
116      * Create a new instance
117      *
118      * @param parent            the {@link EventExecutorGroup} which is the parent of this instance and belongs to it
119      * @param threadFactory     the {@link ThreadFactory} which will be used for the used {@link Thread}
120      * @param addTaskWakesUp    {@code true} if and only if invocation of {@link #addTask(Runnable)} will wake up the
121      *                          executor thread
122      * @param maxPendingTasks   the maximum number of pending tasks before new tasks will be rejected.
123      * @param rejectedHandler   the {@link RejectedExecutionHandler} to use.
124      */
125     protected SingleThreadEventExecutor(
126             EventExecutorGroup parent, ThreadFactory threadFactory,
127             boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedHandler) {
128         this(parent, new ThreadPerTaskExecutor(threadFactory), addTaskWakesUp, maxPendingTasks, rejectedHandler);
129     }
130 
131     /**
132      * Create a new instance
133      *
134      * @param parent            the {@link EventExecutorGroup} which is the parent of this instance and belongs to it
135      * @param executor          the {@link Executor} which will be used for executing
136      * @param addTaskWakesUp    {@code true} if and only if invocation of {@link #addTask(Runnable)} will wake up the
137      *                          executor thread
138      */
139     protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp) {
140         this(parent, executor, addTaskWakesUp, DEFAULT_MAX_PENDING_EXECUTOR_TASKS, RejectedExecutionHandlers.reject());
141     }
142 
143     /**
144      * Create a new instance
145      *
146      * @param parent            the {@link EventExecutorGroup} which is the parent of this instance and belongs to it
147      * @param executor          the {@link Executor} which will be used for executing
148      * @param addTaskWakesUp    {@code true} if and only if invocation of {@link #addTask(Runnable)} will wake up the
149      *                          executor thread
150      * @param maxPendingTasks   the maximum number of pending tasks before new tasks will be rejected.
151      * @param rejectedHandler   the {@link RejectedExecutionHandler} to use.
152      */
153     protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
154                                         boolean addTaskWakesUp, int maxPendingTasks,
155                                         RejectedExecutionHandler rejectedHandler) {
156         super(parent);
157         this.addTaskWakesUp = addTaskWakesUp;
158         this.maxPendingTasks = Math.max(16, maxPendingTasks);
159         this.executor = ThreadExecutorMap.apply(executor, this);
160         taskQueue = newTaskQueue(this.maxPendingTasks);
161         rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
162     }
163 
164     protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
165                                         boolean addTaskWakesUp, Queue<Runnable> taskQueue,
166                                         RejectedExecutionHandler rejectedHandler) {
167         super(parent);
168         this.addTaskWakesUp = addTaskWakesUp;
169         this.maxPendingTasks = DEFAULT_MAX_PENDING_EXECUTOR_TASKS;
170         this.executor = ThreadExecutorMap.apply(executor, this);
171         this.taskQueue = ObjectUtil.checkNotNull(taskQueue, "taskQueue");
172         this.rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
173     }
174 
175     /**
176      * @deprecated Please use and override {@link #newTaskQueue(int)}.
177      */
178     @Deprecated
179     protected Queue<Runnable> newTaskQueue() {
180         return newTaskQueue(maxPendingTasks);
181     }
182 
183     /**
184      * Create a new {@link Queue} which will holds the tasks to execute. This default implementation will return a
185      * {@link LinkedBlockingQueue} but if your sub-class of {@link SingleThreadEventExecutor} will not do any blocking
186      * calls on the this {@link Queue} it may make sense to {@code @Override} this and return some more performant
187      * implementation that does not support blocking operations at all.
188      */
189     protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
190         return new LinkedBlockingQueue<Runnable>(maxPendingTasks);
191     }
192 
193     /**
194      * Interrupt the current running {@link Thread}.
195      */
196     protected void interruptThread() {
197         Thread currentThread = thread;
198         if (currentThread == null) {
199             interrupted = true;
200         } else {
201             currentThread.interrupt();
202         }
203     }
204 
205     /**
206      * @see Queue#poll()
207      */
208     protected Runnable pollTask() {
209         assert inEventLoop();
210         return pollTaskFrom(taskQueue);
211     }
212 
213     protected static Runnable pollTaskFrom(Queue<Runnable> taskQueue) {
214         for (;;) {
215             Runnable task = taskQueue.poll();
216             if (task != WAKEUP_TASK) {
217                 return task;
218             }
219         }
220     }
221 
222     /**
223      * Take the next {@link Runnable} from the task queue and so will block if no task is currently present.
224      * <p>
225      * Be aware that this method will throw an {@link UnsupportedOperationException} if the task queue, which was
226      * created via {@link #newTaskQueue()}, does not implement {@link BlockingQueue}.
227      * </p>
228      *
229      * @return {@code null} if the executor thread has been interrupted or waken up.
230      */
231     protected Runnable takeTask() {
232         assert inEventLoop();
233         if (!(taskQueue instanceof BlockingQueue)) {
234             throw new UnsupportedOperationException();
235         }
236 
237         BlockingQueue<Runnable> taskQueue = (BlockingQueue<Runnable>) this.taskQueue;
238         for (;;) {
239             ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
240             if (scheduledTask == null) {
241                 Runnable task = null;
242                 try {
243                     task = taskQueue.take();
244                     if (task == WAKEUP_TASK) {
245                         task = null;
246                     }
247                 } catch (InterruptedException e) {
248                     // Ignore
249                 }
250                 return task;
251             } else {
252                 long delayNanos = scheduledTask.delayNanos();
253                 Runnable task = null;
254                 if (delayNanos > 0) {
255                     try {
256                         task = taskQueue.poll(delayNanos, TimeUnit.NANOSECONDS);
257                     } catch (InterruptedException e) {
258                         // Waken up.
259                         return null;
260                     }
261                 }
262                 if (task == null) {
263                     // We need to fetch the scheduled tasks now as otherwise there may be a chance that
264                     // scheduled tasks are never executed if there is always one task in the taskQueue.
265                     // This is for example true for the read task of OIO Transport
266                     // See https://github.com/netty/netty/issues/1614
267                     fetchFromScheduledTaskQueue();
268                     task = taskQueue.poll();
269                 }
270 
271                 if (task != null) {
272                     return task;
273                 }
274             }
275         }
276     }
277 
278     private boolean fetchFromScheduledTaskQueue() {
279         if (scheduledTaskQueue == null || scheduledTaskQueue.isEmpty()) {
280             return true;
281         }
282         long nanoTime = getCurrentTimeNanos();
283         for (;;) {
284             Runnable scheduledTask = pollScheduledTask(nanoTime);
285             if (scheduledTask == null) {
286                 return true;
287             }
288             if (!taskQueue.offer(scheduledTask)) {
289                 // No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again.
290                 scheduledTaskQueue.add((ScheduledFutureTask<?>) scheduledTask);
291                 return false;
292             }
293         }
294     }
295 
296     /**
297      * @return {@code true} if at least one scheduled task was executed.
298      */
299     private boolean executeExpiredScheduledTasks() {
300         if (scheduledTaskQueue == null || scheduledTaskQueue.isEmpty()) {
301             return false;
302         }
303         long nanoTime = getCurrentTimeNanos();
304         Runnable scheduledTask = pollScheduledTask(nanoTime);
305         if (scheduledTask == null) {
306             return false;
307         }
308         do {
309             safeExecute(scheduledTask);
310         } while ((scheduledTask = pollScheduledTask(nanoTime)) != null);
311         return true;
312     }
313 
314     /**
315      * @see Queue#peek()
316      */
317     protected Runnable peekTask() {
318         assert inEventLoop();
319         return taskQueue.peek();
320     }
321 
322     /**
323      * @see Queue#isEmpty()
324      */
325     protected boolean hasTasks() {
326         assert inEventLoop();
327         return !taskQueue.isEmpty();
328     }
329 
330     /**
331      * Return the number of tasks that are pending for processing.
332      */
333     public int pendingTasks() {
334         return taskQueue.size();
335     }
336 
337     /**
338      * Add a task to the task queue, or throws a {@link RejectedExecutionException} if this instance was shutdown
339      * before.
340      */
341     protected void addTask(Runnable task) {
342         ObjectUtil.checkNotNull(task, "task");
343         if (!offerTask(task)) {
344             reject(task);
345         }
346     }
347 
348     final boolean offerTask(Runnable task) {
349         if (isShutdown()) {
350             reject();
351         }
352         return taskQueue.offer(task);
353     }
354 
355     /**
356      * @see Queue#remove(Object)
357      */
358     protected boolean removeTask(Runnable task) {
359         return taskQueue.remove(ObjectUtil.checkNotNull(task, "task"));
360     }
361 
362     /**
363      * Poll all tasks from the task queue and run them via {@link Runnable#run()} method.
364      *
365      * @return {@code true} if and only if at least one task was run
366      */
367     protected boolean runAllTasks() {
368         assert inEventLoop();
369         boolean fetchedAll;
370         boolean ranAtLeastOne = false;
371 
372         do {
373             fetchedAll = fetchFromScheduledTaskQueue();
374             if (runAllTasksFrom(taskQueue)) {
375                 ranAtLeastOne = true;
376             }
377         } while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.
378 
379         if (ranAtLeastOne) {
380             lastExecutionTime = getCurrentTimeNanos();
381         }
382         afterRunningAllTasks();
383         return ranAtLeastOne;
384     }
385 
386     /**
387      * Execute all expired scheduled tasks and all current tasks in the executor queue until both queues are empty,
388      * or {@code maxDrainAttempts} has been exceeded.
389      * @param maxDrainAttempts The maximum amount of times this method attempts to drain from queues. This is to prevent
390      *                         continuous task execution and scheduling from preventing the EventExecutor thread to
391      *                         make progress and return to the selector mechanism to process inbound I/O events.
392      * @return {@code true} if at least one task was run.
393      */
394     protected final boolean runScheduledAndExecutorTasks(final int maxDrainAttempts) {
395         assert inEventLoop();
396         boolean ranAtLeastOneTask;
397         int drainAttempt = 0;
398         do {
399             // We must run the taskQueue tasks first, because the scheduled tasks from outside the EventLoop are queued
400             // here because the taskQueue is thread safe and the scheduledTaskQueue is not thread safe.
401             ranAtLeastOneTask = runExistingTasksFrom(taskQueue) | executeExpiredScheduledTasks();
402         } while (ranAtLeastOneTask && ++drainAttempt < maxDrainAttempts);
403 
404         if (drainAttempt > 0) {
405             lastExecutionTime = getCurrentTimeNanos();
406         }
407         afterRunningAllTasks();
408 
409         return drainAttempt > 0;
410     }
411 
412     /**
413      * Runs all tasks from the passed {@code taskQueue}.
414      *
415      * @param taskQueue To poll and execute all tasks.
416      *
417      * @return {@code true} if at least one task was executed.
418      */
419     protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) {
420         Runnable task = pollTaskFrom(taskQueue);
421         if (task == null) {
422             return false;
423         }
424         for (;;) {
425             safeExecute(task);
426             task = pollTaskFrom(taskQueue);
427             if (task == null) {
428                 return true;
429             }
430         }
431     }
432 
433     /**
434      * What ever tasks are present in {@code taskQueue} when this method is invoked will be {@link Runnable#run()}.
435      * @param taskQueue the task queue to drain.
436      * @return {@code true} if at least {@link Runnable#run()} was called.
437      */
438     private boolean runExistingTasksFrom(Queue<Runnable> taskQueue) {
439         Runnable task = pollTaskFrom(taskQueue);
440         if (task == null) {
441             return false;
442         }
443         int remaining = Math.min(maxPendingTasks, taskQueue.size());
444         safeExecute(task);
445         // Use taskQueue.poll() directly rather than pollTaskFrom() since the latter may
446         // silently consume more than one item from the queue (skips over WAKEUP_TASK instances)
447         while (remaining-- > 0 && (task = taskQueue.poll()) != null) {
448             safeExecute(task);
449         }
450         return true;
451     }
452 
453     /**
454      * Poll all tasks from the task queue and run them via {@link Runnable#run()} method.  This method stops running
455      * the tasks in the task queue and returns if it ran longer than {@code timeoutNanos}.
456      */
457     protected boolean runAllTasks(long timeoutNanos) {
458         fetchFromScheduledTaskQueue();
459         Runnable task = pollTask();
460         if (task == null) {
461             afterRunningAllTasks();
462             return false;
463         }
464 
465         final long deadline = timeoutNanos > 0 ? getCurrentTimeNanos() + timeoutNanos : 0;
466         long runTasks = 0;
467         long lastExecutionTime;
468         for (;;) {
469             safeExecute(task);
470 
471             runTasks ++;
472 
473             // Check timeout every 64 tasks because nanoTime() is relatively expensive.
474             // XXX: Hard-coded value - will make it configurable if it is really a problem.
475             if ((runTasks & 0x3F) == 0) {
476                 lastExecutionTime = getCurrentTimeNanos();
477                 if (lastExecutionTime >= deadline) {
478                     break;
479                 }
480             }
481 
482             task = pollTask();
483             if (task == null) {
484                 lastExecutionTime = getCurrentTimeNanos();
485                 break;
486             }
487         }
488 
489         afterRunningAllTasks();
490         this.lastExecutionTime = lastExecutionTime;
491         return true;
492     }
493 
494     /**
495      * Invoked before returning from {@link #runAllTasks()} and {@link #runAllTasks(long)}.
496      */
497     protected void afterRunningAllTasks() { }
498 
499     /**
500      * Returns the amount of time left until the scheduled task with the closest dead line is executed.
501      */
502     protected long delayNanos(long currentTimeNanos) {
503         currentTimeNanos -= initialNanoTime();
504 
505         ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
506         if (scheduledTask == null) {
507             return SCHEDULE_PURGE_INTERVAL;
508         }
509 
510         return scheduledTask.delayNanos(currentTimeNanos);
511     }
512 
513     /**
514      * Returns the absolute point in time (relative to {@link #getCurrentTimeNanos()}) at which the next
515      * closest scheduled task should run.
516      */
517     protected long deadlineNanos() {
518         ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
519         if (scheduledTask == null) {
520             return getCurrentTimeNanos() + SCHEDULE_PURGE_INTERVAL;
521         }
522         return scheduledTask.deadlineNanos();
523     }
524 
525     /**
526      * Updates the internal timestamp that tells when a submitted task was executed most recently.
527      * {@link #runAllTasks()} and {@link #runAllTasks(long)} updates this timestamp automatically, and thus there's
528      * usually no need to call this method.  However, if you take the tasks manually using {@link #takeTask()} or
529      * {@link #pollTask()}, you have to call this method at the end of task execution loop for accurate quiet period
530      * checks.
531      */
532     protected void updateLastExecutionTime() {
533         lastExecutionTime = getCurrentTimeNanos();
534     }
535 
536     /**
537      * Run the tasks in the {@link #taskQueue}
538      */
539     protected abstract void run();
540 
541     /**
542      * Do nothing, sub-classes may override
543      */
544     protected void cleanup() {
545         // NOOP
546     }
547 
548     protected void wakeup(boolean inEventLoop) {
549         if (!inEventLoop) {
550             // Use offer as we actually only need this to unblock the thread and if offer fails we do not care as there
551             // is already something in the queue.
552             taskQueue.offer(WAKEUP_TASK);
553         }
554     }
555 
556     @Override
557     public boolean inEventLoop(Thread thread) {
558         return thread == this.thread;
559     }
560 
561     /**
562      * Add a {@link Runnable} which will be executed on shutdown of this instance
563      */
564     public void addShutdownHook(final Runnable task) {
565         if (inEventLoop()) {
566             shutdownHooks.add(task);
567         } else {
568             execute(new Runnable() {
569                 @Override
570                 public void run() {
571                     shutdownHooks.add(task);
572                 }
573             });
574         }
575     }
576 
577     /**
578      * Remove a previous added {@link Runnable} as a shutdown hook
579      */
580     public void removeShutdownHook(final Runnable task) {
581         if (inEventLoop()) {
582             shutdownHooks.remove(task);
583         } else {
584             execute(new Runnable() {
585                 @Override
586                 public void run() {
587                     shutdownHooks.remove(task);
588                 }
589             });
590         }
591     }
592 
593     private boolean runShutdownHooks() {
594         boolean ran = false;
595         // Note shutdown hooks can add / remove shutdown hooks.
596         while (!shutdownHooks.isEmpty()) {
597             List<Runnable> copy = new ArrayList<Runnable>(shutdownHooks);
598             shutdownHooks.clear();
599             for (Runnable task: copy) {
600                 try {
601                     runTask(task);
602                 } catch (Throwable t) {
603                     logger.warn("Shutdown hook raised an exception.", t);
604                 } finally {
605                     ran = true;
606                 }
607             }
608         }
609 
610         if (ran) {
611             lastExecutionTime = getCurrentTimeNanos();
612         }
613 
614         return ran;
615     }
616 
617     @Override
618     public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
619         ObjectUtil.checkPositiveOrZero(quietPeriod, "quietPeriod");
620         if (timeout < quietPeriod) {
621             throw new IllegalArgumentException(
622                     "timeout: " + timeout + " (expected >= quietPeriod (" + quietPeriod + "))");
623         }
624         ObjectUtil.checkNotNull(unit, "unit");
625 
626         if (isShuttingDown()) {
627             return terminationFuture();
628         }
629 
630         boolean inEventLoop = inEventLoop();
631         boolean wakeup;
632         int oldState;
633         for (;;) {
634             if (isShuttingDown()) {
635                 return terminationFuture();
636             }
637             int newState;
638             wakeup = true;
639             oldState = state;
640             if (inEventLoop) {
641                 newState = ST_SHUTTING_DOWN;
642             } else {
643                 switch (oldState) {
644                     case ST_NOT_STARTED:
645                     case ST_STARTED:
646                         newState = ST_SHUTTING_DOWN;
647                         break;
648                     default:
649                         newState = oldState;
650                         wakeup = false;
651                 }
652             }
653             if (STATE_UPDATER.compareAndSet(this, oldState, newState)) {
654                 break;
655             }
656         }
657         gracefulShutdownQuietPeriod = unit.toNanos(quietPeriod);
658         gracefulShutdownTimeout = unit.toNanos(timeout);
659 
660         if (ensureThreadStarted(oldState)) {
661             return terminationFuture;
662         }
663 
664         if (wakeup) {
665             taskQueue.offer(WAKEUP_TASK);
666             if (!addTaskWakesUp) {
667                 wakeup(inEventLoop);
668             }
669         }
670 
671         return terminationFuture();
672     }
673 
674     @Override
675     public Future<?> terminationFuture() {
676         return terminationFuture;
677     }
678 
679     @Override
680     @Deprecated
681     public void shutdown() {
682         if (isShutdown()) {
683             return;
684         }
685 
686         boolean inEventLoop = inEventLoop();
687         boolean wakeup;
688         int oldState;
689         for (;;) {
690             if (isShuttingDown()) {
691                 return;
692             }
693             int newState;
694             wakeup = true;
695             oldState = state;
696             if (inEventLoop) {
697                 newState = ST_SHUTDOWN;
698             } else {
699                 switch (oldState) {
700                     case ST_NOT_STARTED:
701                     case ST_STARTED:
702                     case ST_SHUTTING_DOWN:
703                         newState = ST_SHUTDOWN;
704                         break;
705                     default:
706                         newState = oldState;
707                         wakeup = false;
708                 }
709             }
710             if (STATE_UPDATER.compareAndSet(this, oldState, newState)) {
711                 break;
712             }
713         }
714 
715         if (ensureThreadStarted(oldState)) {
716             return;
717         }
718 
719         if (wakeup) {
720             taskQueue.offer(WAKEUP_TASK);
721             if (!addTaskWakesUp) {
722                 wakeup(inEventLoop);
723             }
724         }
725     }
726 
727     @Override
728     public boolean isShuttingDown() {
729         return state >= ST_SHUTTING_DOWN;
730     }
731 
732     @Override
733     public boolean isShutdown() {
734         return state >= ST_SHUTDOWN;
735     }
736 
737     @Override
738     public boolean isTerminated() {
739         return state == ST_TERMINATED;
740     }
741 
742     /**
743      * Confirm that the shutdown if the instance should be done now!
744      */
745     protected boolean confirmShutdown() {
746         if (!isShuttingDown()) {
747             return false;
748         }
749 
750         if (!inEventLoop()) {
751             throw new IllegalStateException("must be invoked from an event loop");
752         }
753 
754         cancelScheduledTasks();
755 
756         if (gracefulShutdownStartTime == 0) {
757             gracefulShutdownStartTime = getCurrentTimeNanos();
758         }
759 
760         if (runAllTasks() || runShutdownHooks()) {
761             if (isShutdown()) {
762                 // Executor shut down - no new tasks anymore.
763                 return true;
764             }
765 
766             // There were tasks in the queue. Wait a little bit more until no tasks are queued for the quiet period or
767             // terminate if the quiet period is 0.
768             // See https://github.com/netty/netty/issues/4241
769             if (gracefulShutdownQuietPeriod == 0) {
770                 return true;
771             }
772             taskQueue.offer(WAKEUP_TASK);
773             return false;
774         }
775 
776         final long nanoTime = getCurrentTimeNanos();
777 
778         if (isShutdown() || nanoTime - gracefulShutdownStartTime > gracefulShutdownTimeout) {
779             return true;
780         }
781 
782         if (nanoTime - lastExecutionTime <= gracefulShutdownQuietPeriod) {
783             // Check if any tasks were added to the queue every 100ms.
784             // TODO: Change the behavior of takeTask() so that it returns on timeout.
785             taskQueue.offer(WAKEUP_TASK);
786             try {
787                 Thread.sleep(100);
788             } catch (InterruptedException e) {
789                 // Ignore
790             }
791 
792             return false;
793         }
794 
795         // No tasks were added for last quiet period - hopefully safe to shut down.
796         // (Hopefully because we really cannot make a guarantee that there will be no execute() calls by a user.)
797         return true;
798     }
799 
800     @Override
801     public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
802         ObjectUtil.checkNotNull(unit, "unit");
803         if (inEventLoop()) {
804             throw new IllegalStateException("cannot await termination of the current thread");
805         }
806 
807         threadLock.await(timeout, unit);
808 
809         return isTerminated();
810     }
811 
812     @Override
813     public void execute(Runnable task) {
814         execute0(task);
815     }
816 
817     @Override
818     public void lazyExecute(Runnable task) {
819         lazyExecute0(task);
820     }
821 
822     private void execute0(@Schedule Runnable task) {
823         ObjectUtil.checkNotNull(task, "task");
824         execute(task, wakesUpForTask(task));
825     }
826 
827     private void lazyExecute0(@Schedule Runnable task) {
828         execute(ObjectUtil.checkNotNull(task, "task"), false);
829     }
830 
831     private void execute(Runnable task, boolean immediate) {
832         boolean inEventLoop = inEventLoop();
833         addTask(task);
834         if (!inEventLoop) {
835             startThread();
836             if (isShutdown()) {
837                 boolean reject = false;
838                 try {
839                     if (removeTask(task)) {
840                         reject = true;
841                     }
842                 } catch (UnsupportedOperationException e) {
843                     // The task queue does not support removal so the best thing we can do is to just move on and
844                     // hope we will be able to pick-up the task before its completely terminated.
845                     // In worst case we will log on termination.
846                 }
847                 if (reject) {
848                     reject();
849                 }
850             }
851         }
852 
853         if (!addTaskWakesUp && immediate) {
854             wakeup(inEventLoop);
855         }
856     }
857 
858     @Override
859     public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
860         throwIfInEventLoop("invokeAny");
861         return super.invokeAny(tasks);
862     }
863 
864     @Override
865     public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
866             throws InterruptedException, ExecutionException, TimeoutException {
867         throwIfInEventLoop("invokeAny");
868         return super.invokeAny(tasks, timeout, unit);
869     }
870 
871     @Override
872     public <T> List<java.util.concurrent.Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
873             throws InterruptedException {
874         throwIfInEventLoop("invokeAll");
875         return super.invokeAll(tasks);
876     }
877 
878     @Override
879     public <T> List<java.util.concurrent.Future<T>> invokeAll(
880             Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
881         throwIfInEventLoop("invokeAll");
882         return super.invokeAll(tasks, timeout, unit);
883     }
884 
885     private void throwIfInEventLoop(String method) {
886         if (inEventLoop()) {
887             throw new RejectedExecutionException("Calling " + method + " from within the EventLoop is not allowed");
888         }
889     }
890 
891     /**
892      * Returns the {@link ThreadProperties} of the {@link Thread} that powers the {@link SingleThreadEventExecutor}.
893      * If the {@link SingleThreadEventExecutor} is not started yet, this operation will start it and block until
894      * it is fully started.
895      */
896     public final ThreadProperties threadProperties() {
897         ThreadProperties threadProperties = this.threadProperties;
898         if (threadProperties == null) {
899             Thread thread = this.thread;
900             if (thread == null) {
901                 assert !inEventLoop();
902                 submit(NOOP_TASK).syncUninterruptibly();
903                 thread = this.thread;
904                 assert thread != null;
905             }
906 
907             threadProperties = new DefaultThreadProperties(thread);
908             if (!PROPERTIES_UPDATER.compareAndSet(this, null, threadProperties)) {
909                 threadProperties = this.threadProperties;
910             }
911         }
912 
913         return threadProperties;
914     }
915 
916     /**
917      * @deprecated override {@link SingleThreadEventExecutor#wakesUpForTask} to re-create this behaviour
918      */
919     @Deprecated
920     protected interface NonWakeupRunnable extends LazyRunnable { }
921 
922     /**
923      * Can be overridden to control which tasks require waking the {@link EventExecutor} thread
924      * if it is waiting so that they can be run immediately.
925      */
926     protected boolean wakesUpForTask(Runnable task) {
927         return true;
928     }
929 
930     protected static void reject() {
931         throw new RejectedExecutionException("event executor terminated");
932     }
933 
934     /**
935      * Offers the task to the associated {@link RejectedExecutionHandler}.
936      *
937      * @param task to reject.
938      */
939     protected final void reject(Runnable task) {
940         rejectedExecutionHandler.rejected(task, this);
941     }
942 
943     // ScheduledExecutorService implementation
944 
945     private static final long SCHEDULE_PURGE_INTERVAL = TimeUnit.SECONDS.toNanos(1);
946 
947     private void startThread() {
948         if (state == ST_NOT_STARTED) {
949             if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
950                 boolean success = false;
951                 try {
952                     doStartThread();
953                     success = true;
954                 } finally {
955                     if (!success) {
956                         STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
957                     }
958                 }
959             }
960         }
961     }
962 
963     private boolean ensureThreadStarted(int oldState) {
964         if (oldState == ST_NOT_STARTED) {
965             try {
966                 doStartThread();
967             } catch (Throwable cause) {
968                 STATE_UPDATER.set(this, ST_TERMINATED);
969                 terminationFuture.tryFailure(cause);
970 
971                 if (!(cause instanceof Exception)) {
972                     // Also rethrow as it may be an OOME for example
973                     PlatformDependent.throwException(cause);
974                 }
975                 return true;
976             }
977         }
978         return false;
979     }
980 
981     private void doStartThread() {
982         assert thread == null;
983         executor.execute(new Runnable() {
984             @Override
985             public void run() {
986                 thread = Thread.currentThread();
987                 if (interrupted) {
988                     thread.interrupt();
989                 }
990 
991                 boolean success = false;
992                 updateLastExecutionTime();
993                 try {
994                     SingleThreadEventExecutor.this.run();
995                     success = true;
996                 } catch (Throwable t) {
997                     logger.warn("Unexpected exception from an event executor: ", t);
998                 } finally {
999                     for (;;) {
1000                         int oldState = state;
1001                         if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
1002                                 SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
1003                             break;
1004                         }
1005                     }
1006 
1007                     // Check if confirmShutdown() was called at the end of the loop.
1008                     if (success && gracefulShutdownStartTime == 0) {
1009                         if (logger.isErrorEnabled()) {
1010                             logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
1011                                     SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must " +
1012                                     "be called before run() implementation terminates.");
1013                         }
1014                     }
1015 
1016                     try {
1017                         // Run all remaining tasks and shutdown hooks. At this point the event loop
1018                         // is in ST_SHUTTING_DOWN state still accepting tasks which is needed for
1019                         // graceful shutdown with quietPeriod.
1020                         for (;;) {
1021                             if (confirmShutdown()) {
1022                                 break;
1023                             }
1024                         }
1025 
1026                         // Now we want to make sure no more tasks can be added from this point. This is
1027                         // achieved by switching the state. Any new tasks beyond this point will be rejected.
1028                         for (;;) {
1029                             int oldState = state;
1030                             if (oldState >= ST_SHUTDOWN || STATE_UPDATER.compareAndSet(
1031                                     SingleThreadEventExecutor.this, oldState, ST_SHUTDOWN)) {
1032                                 break;
1033                             }
1034                         }
1035 
1036                         // We have the final set of tasks in the queue now, no more can be added, run all remaining.
1037                         // No need to loop here, this is the final pass.
1038                         confirmShutdown();
1039                     } finally {
1040                         try {
1041                             cleanup();
1042                         } finally {
1043                             // Lets remove all FastThreadLocals for the Thread as we are about to terminate and notify
1044                             // the future. The user may block on the future and once it unblocks the JVM may terminate
1045                             // and start unloading classes.
1046                             // See https://github.com/netty/netty/issues/6596.
1047                             FastThreadLocal.removeAll();
1048 
1049                             STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
1050                             threadLock.countDown();
1051                             int numUserTasks = drainTasks();
1052                             if (numUserTasks > 0 && logger.isWarnEnabled()) {
1053                                 logger.warn("An event executor terminated with " +
1054                                         "non-empty task queue (" + numUserTasks + ')');
1055                             }
1056                             terminationFuture.setSuccess(null);
1057                         }
1058                     }
1059                 }
1060             }
1061         });
1062     }
1063 
1064     final int drainTasks() {
1065         int numTasks = 0;
1066         for (;;) {
1067             Runnable runnable = taskQueue.poll();
1068             if (runnable == null) {
1069                 break;
1070             }
1071             // WAKEUP_TASK should be just discarded as these are added internally.
1072             // The important bit is that we not have any user tasks left.
1073             if (WAKEUP_TASK != runnable) {
1074                 numTasks++;
1075             }
1076         }
1077         return numTasks;
1078     }
1079 
1080     private static final class DefaultThreadProperties implements ThreadProperties {
1081         private final Thread t;
1082 
1083         DefaultThreadProperties(Thread t) {
1084             this.t = t;
1085         }
1086 
1087         @Override
1088         public State state() {
1089             return t.getState();
1090         }
1091 
1092         @Override
1093         public int priority() {
1094             return t.getPriority();
1095         }
1096 
1097         @Override
1098         public boolean isInterrupted() {
1099             return t.isInterrupted();
1100         }
1101 
1102         @Override
1103         public boolean isDaemon() {
1104             return t.isDaemon();
1105         }
1106 
1107         @Override
1108         public String name() {
1109             return t.getName();
1110         }
1111 
1112         @Override
1113         public long id() {
1114             return t.getId();
1115         }
1116 
1117         @Override
1118         public StackTraceElement[] stackTrace() {
1119             return t.getStackTrace();
1120         }
1121 
1122         @Override
1123         public boolean isAlive() {
1124             return t.isAlive();
1125         }
1126     }
1127 }