View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.util.concurrent;
17  
18  import io.netty.util.internal.ObjectUtil;
19  import io.netty.util.internal.PlatformDependent;
20  import io.netty.util.internal.SystemPropertyUtil;
21  import io.netty.util.internal.ThreadExecutorMap;
22  import io.netty.util.internal.UnstableApi;
23  import io.netty.util.internal.logging.InternalLogger;
24  import io.netty.util.internal.logging.InternalLoggerFactory;
25  import org.jetbrains.annotations.Async.Schedule;
26  
27  import java.lang.Thread.State;
28  import java.util.ArrayList;
29  import java.util.Collection;
30  import java.util.LinkedHashSet;
31  import java.util.List;
32  import java.util.Queue;
33  import java.util.Set;
34  import java.util.concurrent.BlockingQueue;
35  import java.util.concurrent.Callable;
36  import java.util.concurrent.CountDownLatch;
37  import java.util.concurrent.ExecutionException;
38  import java.util.concurrent.Executor;
39  import java.util.concurrent.LinkedBlockingQueue;
40  import java.util.concurrent.RejectedExecutionException;
41  import java.util.concurrent.ThreadFactory;
42  import java.util.concurrent.TimeUnit;
43  import java.util.concurrent.TimeoutException;
44  import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
45  import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
46  
47  /**
48   * Abstract base class for {@link OrderedEventExecutor}'s that execute all its submitted tasks in a single thread.
49   *
50   */
51  public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
52  
53      static final int DEFAULT_MAX_PENDING_EXECUTOR_TASKS = Math.max(16,
54              SystemPropertyUtil.getInt("io.netty.eventexecutor.maxPendingTasks", Integer.MAX_VALUE));
55  
56      private static final InternalLogger logger =
57              InternalLoggerFactory.getInstance(SingleThreadEventExecutor.class);
58  
59      private static final int ST_NOT_STARTED = 1;
60      private static final int ST_STARTED = 2;
61      private static final int ST_SHUTTING_DOWN = 3;
62      private static final int ST_SHUTDOWN = 4;
63      private static final int ST_TERMINATED = 5;
64  
65      private static final Runnable NOOP_TASK = new Runnable() {
66          @Override
67          public void run() {
68              // Do nothing.
69          }
70      };
71  
72      private static final AtomicIntegerFieldUpdater<SingleThreadEventExecutor> STATE_UPDATER =
73              AtomicIntegerFieldUpdater.newUpdater(SingleThreadEventExecutor.class, "state");
74      private static final AtomicReferenceFieldUpdater<SingleThreadEventExecutor, ThreadProperties> PROPERTIES_UPDATER =
75              AtomicReferenceFieldUpdater.newUpdater(
76                      SingleThreadEventExecutor.class, ThreadProperties.class, "threadProperties");
77  
78      private final Queue<Runnable> taskQueue;
79  
80      private volatile Thread thread;
81      @SuppressWarnings("unused")
82      private volatile ThreadProperties threadProperties;
83      private final Executor executor;
84      private volatile boolean interrupted;
85  
86      private final CountDownLatch threadLock = new CountDownLatch(1);
87      private final Set<Runnable> shutdownHooks = new LinkedHashSet<Runnable>();
88      private final boolean addTaskWakesUp;
89      private final int maxPendingTasks;
90      private final RejectedExecutionHandler rejectedExecutionHandler;
91  
92      private long lastExecutionTime;
93  
94      @SuppressWarnings({ "FieldMayBeFinal", "unused" })
95      private volatile int state = ST_NOT_STARTED;
96  
97      private volatile long gracefulShutdownQuietPeriod;
98      private volatile long gracefulShutdownTimeout;
99      private long gracefulShutdownStartTime;
100 
101     private final Promise<?> terminationFuture = new DefaultPromise<Void>(GlobalEventExecutor.INSTANCE);
102 
103     /**
104      * Create a new instance
105      *
106      * @param parent            the {@link EventExecutorGroup} which is the parent of this instance and belongs to it
107      * @param threadFactory     the {@link ThreadFactory} which will be used for the used {@link Thread}
108      * @param addTaskWakesUp    {@code true} if and only if invocation of {@link #addTask(Runnable)} will wake up the
109      *                          executor thread
110      */
111     protected SingleThreadEventExecutor(
112             EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
113         this(parent, new ThreadPerTaskExecutor(threadFactory), addTaskWakesUp);
114     }
115 
116     /**
117      * Create a new instance
118      *
119      * @param parent            the {@link EventExecutorGroup} which is the parent of this instance and belongs to it
120      * @param threadFactory     the {@link ThreadFactory} which will be used for the used {@link Thread}
121      * @param addTaskWakesUp    {@code true} if and only if invocation of {@link #addTask(Runnable)} will wake up the
122      *                          executor thread
123      * @param maxPendingTasks   the maximum number of pending tasks before new tasks will be rejected.
124      * @param rejectedHandler   the {@link RejectedExecutionHandler} to use.
125      */
126     protected SingleThreadEventExecutor(
127             EventExecutorGroup parent, ThreadFactory threadFactory,
128             boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedHandler) {
129         this(parent, new ThreadPerTaskExecutor(threadFactory), addTaskWakesUp, maxPendingTasks, rejectedHandler);
130     }
131 
132     /**
133      * Create a new instance
134      *
135      * @param parent            the {@link EventExecutorGroup} which is the parent of this instance and belongs to it
136      * @param executor          the {@link Executor} which will be used for executing
137      * @param addTaskWakesUp    {@code true} if and only if invocation of {@link #addTask(Runnable)} will wake up the
138      *                          executor thread
139      */
140     protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp) {
141         this(parent, executor, addTaskWakesUp, DEFAULT_MAX_PENDING_EXECUTOR_TASKS, RejectedExecutionHandlers.reject());
142     }
143 
144     /**
145      * Create a new instance
146      *
147      * @param parent            the {@link EventExecutorGroup} which is the parent of this instance and belongs to it
148      * @param executor          the {@link Executor} which will be used for executing
149      * @param addTaskWakesUp    {@code true} if and only if invocation of {@link #addTask(Runnable)} will wake up the
150      *                          executor thread
151      * @param maxPendingTasks   the maximum number of pending tasks before new tasks will be rejected.
152      * @param rejectedHandler   the {@link RejectedExecutionHandler} to use.
153      */
154     protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
155                                         boolean addTaskWakesUp, int maxPendingTasks,
156                                         RejectedExecutionHandler rejectedHandler) {
157         super(parent);
158         this.addTaskWakesUp = addTaskWakesUp;
159         this.maxPendingTasks = Math.max(16, maxPendingTasks);
160         this.executor = ThreadExecutorMap.apply(executor, this);
161         taskQueue = newTaskQueue(this.maxPendingTasks);
162         rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
163     }
164 
165     protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
166                                         boolean addTaskWakesUp, Queue<Runnable> taskQueue,
167                                         RejectedExecutionHandler rejectedHandler) {
168         super(parent);
169         this.addTaskWakesUp = addTaskWakesUp;
170         this.maxPendingTasks = DEFAULT_MAX_PENDING_EXECUTOR_TASKS;
171         this.executor = ThreadExecutorMap.apply(executor, this);
172         this.taskQueue = ObjectUtil.checkNotNull(taskQueue, "taskQueue");
173         this.rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
174     }
175 
176     /**
177      * @deprecated Please use and override {@link #newTaskQueue(int)}.
178      */
179     @Deprecated
180     protected Queue<Runnable> newTaskQueue() {
181         return newTaskQueue(maxPendingTasks);
182     }
183 
184     /**
185      * Create a new {@link Queue} which will holds the tasks to execute. This default implementation will return a
186      * {@link LinkedBlockingQueue} but if your sub-class of {@link SingleThreadEventExecutor} will not do any blocking
187      * calls on the this {@link Queue} it may make sense to {@code @Override} this and return some more performant
188      * implementation that does not support blocking operations at all.
189      */
190     protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
191         return new LinkedBlockingQueue<Runnable>(maxPendingTasks);
192     }
193 
194     /**
195      * Interrupt the current running {@link Thread}.
196      */
197     protected void interruptThread() {
198         Thread currentThread = thread;
199         if (currentThread == null) {
200             interrupted = true;
201         } else {
202             currentThread.interrupt();
203         }
204     }
205 
206     /**
207      * @see Queue#poll()
208      */
209     protected Runnable pollTask() {
210         assert inEventLoop();
211         return pollTaskFrom(taskQueue);
212     }
213 
214     protected static Runnable pollTaskFrom(Queue<Runnable> taskQueue) {
215         for (;;) {
216             Runnable task = taskQueue.poll();
217             if (task != WAKEUP_TASK) {
218                 return task;
219             }
220         }
221     }
222 
223     /**
224      * Take the next {@link Runnable} from the task queue and so will block if no task is currently present.
225      * <p>
226      * Be aware that this method will throw an {@link UnsupportedOperationException} if the task queue, which was
227      * created via {@link #newTaskQueue()}, does not implement {@link BlockingQueue}.
228      * </p>
229      *
230      * @return {@code null} if the executor thread has been interrupted or waken up.
231      */
232     protected Runnable takeTask() {
233         assert inEventLoop();
234         if (!(taskQueue instanceof BlockingQueue)) {
235             throw new UnsupportedOperationException();
236         }
237 
238         BlockingQueue<Runnable> taskQueue = (BlockingQueue<Runnable>) this.taskQueue;
239         for (;;) {
240             ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
241             if (scheduledTask == null) {
242                 Runnable task = null;
243                 try {
244                     task = taskQueue.take();
245                     if (task == WAKEUP_TASK) {
246                         task = null;
247                     }
248                 } catch (InterruptedException e) {
249                     // Ignore
250                 }
251                 return task;
252             } else {
253                 long delayNanos = scheduledTask.delayNanos();
254                 Runnable task = null;
255                 if (delayNanos > 0) {
256                     try {
257                         task = taskQueue.poll(delayNanos, TimeUnit.NANOSECONDS);
258                     } catch (InterruptedException e) {
259                         // Waken up.
260                         return null;
261                     }
262                 }
263                 if (task == null) {
264                     // We need to fetch the scheduled tasks now as otherwise there may be a chance that
265                     // scheduled tasks are never executed if there is always one task in the taskQueue.
266                     // This is for example true for the read task of OIO Transport
267                     // See https://github.com/netty/netty/issues/1614
268                     fetchFromScheduledTaskQueue();
269                     task = taskQueue.poll();
270                 }
271 
272                 if (task != null) {
273                     return task;
274                 }
275             }
276         }
277     }
278 
279     private boolean fetchFromScheduledTaskQueue() {
280         if (scheduledTaskQueue == null || scheduledTaskQueue.isEmpty()) {
281             return true;
282         }
283         long nanoTime = getCurrentTimeNanos();
284         for (;;) {
285             Runnable scheduledTask = pollScheduledTask(nanoTime);
286             if (scheduledTask == null) {
287                 return true;
288             }
289             if (!taskQueue.offer(scheduledTask)) {
290                 // No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again.
291                 scheduledTaskQueue.add((ScheduledFutureTask<?>) scheduledTask);
292                 return false;
293             }
294         }
295     }
296 
297     /**
298      * @return {@code true} if at least one scheduled task was executed.
299      */
300     private boolean executeExpiredScheduledTasks() {
301         if (scheduledTaskQueue == null || scheduledTaskQueue.isEmpty()) {
302             return false;
303         }
304         long nanoTime = getCurrentTimeNanos();
305         Runnable scheduledTask = pollScheduledTask(nanoTime);
306         if (scheduledTask == null) {
307             return false;
308         }
309         do {
310             safeExecute(scheduledTask);
311         } while ((scheduledTask = pollScheduledTask(nanoTime)) != null);
312         return true;
313     }
314 
315     /**
316      * @see Queue#peek()
317      */
318     protected Runnable peekTask() {
319         assert inEventLoop();
320         return taskQueue.peek();
321     }
322 
323     /**
324      * @see Queue#isEmpty()
325      */
326     protected boolean hasTasks() {
327         assert inEventLoop();
328         return !taskQueue.isEmpty();
329     }
330 
331     /**
332      * Return the number of tasks that are pending for processing.
333      */
334     public int pendingTasks() {
335         return taskQueue.size();
336     }
337 
338     /**
339      * Add a task to the task queue, or throws a {@link RejectedExecutionException} if this instance was shutdown
340      * before.
341      */
342     protected void addTask(Runnable task) {
343         ObjectUtil.checkNotNull(task, "task");
344         if (!offerTask(task)) {
345             reject(task);
346         }
347     }
348 
349     final boolean offerTask(Runnable task) {
350         if (isShutdown()) {
351             reject();
352         }
353         return taskQueue.offer(task);
354     }
355 
356     /**
357      * @see Queue#remove(Object)
358      */
359     protected boolean removeTask(Runnable task) {
360         return taskQueue.remove(ObjectUtil.checkNotNull(task, "task"));
361     }
362 
363     /**
364      * Poll all tasks from the task queue and run them via {@link Runnable#run()} method.
365      *
366      * @return {@code true} if and only if at least one task was run
367      */
368     protected boolean runAllTasks() {
369         assert inEventLoop();
370         boolean fetchedAll;
371         boolean ranAtLeastOne = false;
372 
373         do {
374             fetchedAll = fetchFromScheduledTaskQueue();
375             if (runAllTasksFrom(taskQueue)) {
376                 ranAtLeastOne = true;
377             }
378         } while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.
379 
380         if (ranAtLeastOne) {
381             lastExecutionTime = getCurrentTimeNanos();
382         }
383         afterRunningAllTasks();
384         return ranAtLeastOne;
385     }
386 
387     /**
388      * Execute all expired scheduled tasks and all current tasks in the executor queue until both queues are empty,
389      * or {@code maxDrainAttempts} has been exceeded.
390      * @param maxDrainAttempts The maximum amount of times this method attempts to drain from queues. This is to prevent
391      *                         continuous task execution and scheduling from preventing the EventExecutor thread to
392      *                         make progress and return to the selector mechanism to process inbound I/O events.
393      * @return {@code true} if at least one task was run.
394      */
395     protected final boolean runScheduledAndExecutorTasks(final int maxDrainAttempts) {
396         assert inEventLoop();
397         boolean ranAtLeastOneTask;
398         int drainAttempt = 0;
399         do {
400             // We must run the taskQueue tasks first, because the scheduled tasks from outside the EventLoop are queued
401             // here because the taskQueue is thread safe and the scheduledTaskQueue is not thread safe.
402             ranAtLeastOneTask = runExistingTasksFrom(taskQueue) | executeExpiredScheduledTasks();
403         } while (ranAtLeastOneTask && ++drainAttempt < maxDrainAttempts);
404 
405         if (drainAttempt > 0) {
406             lastExecutionTime = getCurrentTimeNanos();
407         }
408         afterRunningAllTasks();
409 
410         return drainAttempt > 0;
411     }
412 
413     /**
414      * Runs all tasks from the passed {@code taskQueue}.
415      *
416      * @param taskQueue To poll and execute all tasks.
417      *
418      * @return {@code true} if at least one task was executed.
419      */
420     protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) {
421         Runnable task = pollTaskFrom(taskQueue);
422         if (task == null) {
423             return false;
424         }
425         for (;;) {
426             safeExecute(task);
427             task = pollTaskFrom(taskQueue);
428             if (task == null) {
429                 return true;
430             }
431         }
432     }
433 
434     /**
435      * What ever tasks are present in {@code taskQueue} when this method is invoked will be {@link Runnable#run()}.
436      * @param taskQueue the task queue to drain.
437      * @return {@code true} if at least {@link Runnable#run()} was called.
438      */
439     private boolean runExistingTasksFrom(Queue<Runnable> taskQueue) {
440         Runnable task = pollTaskFrom(taskQueue);
441         if (task == null) {
442             return false;
443         }
444         int remaining = Math.min(maxPendingTasks, taskQueue.size());
445         safeExecute(task);
446         // Use taskQueue.poll() directly rather than pollTaskFrom() since the latter may
447         // silently consume more than one item from the queue (skips over WAKEUP_TASK instances)
448         while (remaining-- > 0 && (task = taskQueue.poll()) != null) {
449             safeExecute(task);
450         }
451         return true;
452     }
453 
454     /**
455      * Poll all tasks from the task queue and run them via {@link Runnable#run()} method.  This method stops running
456      * the tasks in the task queue and returns if it ran longer than {@code timeoutNanos}.
457      */
458     protected boolean runAllTasks(long timeoutNanos) {
459         fetchFromScheduledTaskQueue();
460         Runnable task = pollTask();
461         if (task == null) {
462             afterRunningAllTasks();
463             return false;
464         }
465 
466         final long deadline = timeoutNanos > 0 ? getCurrentTimeNanos() + timeoutNanos : 0;
467         long runTasks = 0;
468         long lastExecutionTime;
469         for (;;) {
470             safeExecute(task);
471 
472             runTasks ++;
473 
474             // Check timeout every 64 tasks because nanoTime() is relatively expensive.
475             // XXX: Hard-coded value - will make it configurable if it is really a problem.
476             if ((runTasks & 0x3F) == 0) {
477                 lastExecutionTime = getCurrentTimeNanos();
478                 if (lastExecutionTime >= deadline) {
479                     break;
480                 }
481             }
482 
483             task = pollTask();
484             if (task == null) {
485                 lastExecutionTime = getCurrentTimeNanos();
486                 break;
487             }
488         }
489 
490         afterRunningAllTasks();
491         this.lastExecutionTime = lastExecutionTime;
492         return true;
493     }
494 
495     /**
496      * Invoked before returning from {@link #runAllTasks()} and {@link #runAllTasks(long)}.
497      */
498     @UnstableApi
499     protected void afterRunningAllTasks() { }
500 
501     /**
502      * Returns the amount of time left until the scheduled task with the closest dead line is executed.
503      */
504     protected long delayNanos(long currentTimeNanos) {
505         currentTimeNanos -= initialNanoTime();
506 
507         ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
508         if (scheduledTask == null) {
509             return SCHEDULE_PURGE_INTERVAL;
510         }
511 
512         return scheduledTask.delayNanos(currentTimeNanos);
513     }
514 
515     /**
516      * Returns the absolute point in time (relative to {@link #getCurrentTimeNanos()}) at which the next
517      * closest scheduled task should run.
518      */
519     @UnstableApi
520     protected long deadlineNanos() {
521         ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
522         if (scheduledTask == null) {
523             return getCurrentTimeNanos() + SCHEDULE_PURGE_INTERVAL;
524         }
525         return scheduledTask.deadlineNanos();
526     }
527 
528     /**
529      * Updates the internal timestamp that tells when a submitted task was executed most recently.
530      * {@link #runAllTasks()} and {@link #runAllTasks(long)} updates this timestamp automatically, and thus there's
531      * usually no need to call this method.  However, if you take the tasks manually using {@link #takeTask()} or
532      * {@link #pollTask()}, you have to call this method at the end of task execution loop for accurate quiet period
533      * checks.
534      */
535     protected void updateLastExecutionTime() {
536         lastExecutionTime = getCurrentTimeNanos();
537     }
538 
539     /**
540      * Run the tasks in the {@link #taskQueue}
541      */
542     protected abstract void run();
543 
544     /**
545      * Do nothing, sub-classes may override
546      */
547     protected void cleanup() {
548         // NOOP
549     }
550 
551     protected void wakeup(boolean inEventLoop) {
552         if (!inEventLoop) {
553             // Use offer as we actually only need this to unblock the thread and if offer fails we do not care as there
554             // is already something in the queue.
555             taskQueue.offer(WAKEUP_TASK);
556         }
557     }
558 
559     @Override
560     public boolean inEventLoop(Thread thread) {
561         return thread == this.thread;
562     }
563 
564     /**
565      * Add a {@link Runnable} which will be executed on shutdown of this instance
566      */
567     public void addShutdownHook(final Runnable task) {
568         if (inEventLoop()) {
569             shutdownHooks.add(task);
570         } else {
571             execute(new Runnable() {
572                 @Override
573                 public void run() {
574                     shutdownHooks.add(task);
575                 }
576             });
577         }
578     }
579 
580     /**
581      * Remove a previous added {@link Runnable} as a shutdown hook
582      */
583     public void removeShutdownHook(final Runnable task) {
584         if (inEventLoop()) {
585             shutdownHooks.remove(task);
586         } else {
587             execute(new Runnable() {
588                 @Override
589                 public void run() {
590                     shutdownHooks.remove(task);
591                 }
592             });
593         }
594     }
595 
596     private boolean runShutdownHooks() {
597         boolean ran = false;
598         // Note shutdown hooks can add / remove shutdown hooks.
599         while (!shutdownHooks.isEmpty()) {
600             List<Runnable> copy = new ArrayList<Runnable>(shutdownHooks);
601             shutdownHooks.clear();
602             for (Runnable task: copy) {
603                 try {
604                     runTask(task);
605                 } catch (Throwable t) {
606                     logger.warn("Shutdown hook raised an exception.", t);
607                 } finally {
608                     ran = true;
609                 }
610             }
611         }
612 
613         if (ran) {
614             lastExecutionTime = getCurrentTimeNanos();
615         }
616 
617         return ran;
618     }
619 
620     @Override
621     public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
622         ObjectUtil.checkPositiveOrZero(quietPeriod, "quietPeriod");
623         if (timeout < quietPeriod) {
624             throw new IllegalArgumentException(
625                     "timeout: " + timeout + " (expected >= quietPeriod (" + quietPeriod + "))");
626         }
627         ObjectUtil.checkNotNull(unit, "unit");
628 
629         if (isShuttingDown()) {
630             return terminationFuture();
631         }
632 
633         boolean inEventLoop = inEventLoop();
634         boolean wakeup;
635         int oldState;
636         for (;;) {
637             if (isShuttingDown()) {
638                 return terminationFuture();
639             }
640             int newState;
641             wakeup = true;
642             oldState = state;
643             if (inEventLoop) {
644                 newState = ST_SHUTTING_DOWN;
645             } else {
646                 switch (oldState) {
647                     case ST_NOT_STARTED:
648                     case ST_STARTED:
649                         newState = ST_SHUTTING_DOWN;
650                         break;
651                     default:
652                         newState = oldState;
653                         wakeup = false;
654                 }
655             }
656             if (STATE_UPDATER.compareAndSet(this, oldState, newState)) {
657                 break;
658             }
659         }
660         gracefulShutdownQuietPeriod = unit.toNanos(quietPeriod);
661         gracefulShutdownTimeout = unit.toNanos(timeout);
662 
663         if (ensureThreadStarted(oldState)) {
664             return terminationFuture;
665         }
666 
667         if (wakeup) {
668             taskQueue.offer(WAKEUP_TASK);
669             if (!addTaskWakesUp) {
670                 wakeup(inEventLoop);
671             }
672         }
673 
674         return terminationFuture();
675     }
676 
677     @Override
678     public Future<?> terminationFuture() {
679         return terminationFuture;
680     }
681 
682     @Override
683     @Deprecated
684     public void shutdown() {
685         if (isShutdown()) {
686             return;
687         }
688 
689         boolean inEventLoop = inEventLoop();
690         boolean wakeup;
691         int oldState;
692         for (;;) {
693             if (isShuttingDown()) {
694                 return;
695             }
696             int newState;
697             wakeup = true;
698             oldState = state;
699             if (inEventLoop) {
700                 newState = ST_SHUTDOWN;
701             } else {
702                 switch (oldState) {
703                     case ST_NOT_STARTED:
704                     case ST_STARTED:
705                     case ST_SHUTTING_DOWN:
706                         newState = ST_SHUTDOWN;
707                         break;
708                     default:
709                         newState = oldState;
710                         wakeup = false;
711                 }
712             }
713             if (STATE_UPDATER.compareAndSet(this, oldState, newState)) {
714                 break;
715             }
716         }
717 
718         if (ensureThreadStarted(oldState)) {
719             return;
720         }
721 
722         if (wakeup) {
723             taskQueue.offer(WAKEUP_TASK);
724             if (!addTaskWakesUp) {
725                 wakeup(inEventLoop);
726             }
727         }
728     }
729 
730     @Override
731     public boolean isShuttingDown() {
732         return state >= ST_SHUTTING_DOWN;
733     }
734 
735     @Override
736     public boolean isShutdown() {
737         return state >= ST_SHUTDOWN;
738     }
739 
740     @Override
741     public boolean isTerminated() {
742         return state == ST_TERMINATED;
743     }
744 
745     /**
746      * Confirm that the shutdown if the instance should be done now!
747      */
748     protected boolean confirmShutdown() {
749         if (!isShuttingDown()) {
750             return false;
751         }
752 
753         if (!inEventLoop()) {
754             throw new IllegalStateException("must be invoked from an event loop");
755         }
756 
757         cancelScheduledTasks();
758 
759         if (gracefulShutdownStartTime == 0) {
760             gracefulShutdownStartTime = getCurrentTimeNanos();
761         }
762 
763         if (runAllTasks() || runShutdownHooks()) {
764             if (isShutdown()) {
765                 // Executor shut down - no new tasks anymore.
766                 return true;
767             }
768 
769             // There were tasks in the queue. Wait a little bit more until no tasks are queued for the quiet period or
770             // terminate if the quiet period is 0.
771             // See https://github.com/netty/netty/issues/4241
772             if (gracefulShutdownQuietPeriod == 0) {
773                 return true;
774             }
775             taskQueue.offer(WAKEUP_TASK);
776             return false;
777         }
778 
779         final long nanoTime = getCurrentTimeNanos();
780 
781         if (isShutdown() || nanoTime - gracefulShutdownStartTime > gracefulShutdownTimeout) {
782             return true;
783         }
784 
785         if (nanoTime - lastExecutionTime <= gracefulShutdownQuietPeriod) {
786             // Check if any tasks were added to the queue every 100ms.
787             // TODO: Change the behavior of takeTask() so that it returns on timeout.
788             taskQueue.offer(WAKEUP_TASK);
789             try {
790                 Thread.sleep(100);
791             } catch (InterruptedException e) {
792                 // Ignore
793             }
794 
795             return false;
796         }
797 
798         // No tasks were added for last quiet period - hopefully safe to shut down.
799         // (Hopefully because we really cannot make a guarantee that there will be no execute() calls by a user.)
800         return true;
801     }
802 
803     @Override
804     public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
805         ObjectUtil.checkNotNull(unit, "unit");
806         if (inEventLoop()) {
807             throw new IllegalStateException("cannot await termination of the current thread");
808         }
809 
810         threadLock.await(timeout, unit);
811 
812         return isTerminated();
813     }
814 
815     @Override
816     public void execute(Runnable task) {
817         execute0(task);
818     }
819 
820     @Override
821     public void lazyExecute(Runnable task) {
822         lazyExecute0(task);
823     }
824 
825     private void execute0(@Schedule Runnable task) {
826         ObjectUtil.checkNotNull(task, "task");
827         execute(task, wakesUpForTask(task));
828     }
829 
830     private void lazyExecute0(@Schedule Runnable task) {
831         execute(ObjectUtil.checkNotNull(task, "task"), false);
832     }
833 
834     private void execute(Runnable task, boolean immediate) {
835         boolean inEventLoop = inEventLoop();
836         addTask(task);
837         if (!inEventLoop) {
838             startThread();
839             if (isShutdown()) {
840                 boolean reject = false;
841                 try {
842                     if (removeTask(task)) {
843                         reject = true;
844                     }
845                 } catch (UnsupportedOperationException e) {
846                     // The task queue does not support removal so the best thing we can do is to just move on and
847                     // hope we will be able to pick-up the task before its completely terminated.
848                     // In worst case we will log on termination.
849                 }
850                 if (reject) {
851                     reject();
852                 }
853             }
854         }
855 
856         if (!addTaskWakesUp && immediate) {
857             wakeup(inEventLoop);
858         }
859     }
860 
861     @Override
862     public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
863         throwIfInEventLoop("invokeAny");
864         return super.invokeAny(tasks);
865     }
866 
867     @Override
868     public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
869             throws InterruptedException, ExecutionException, TimeoutException {
870         throwIfInEventLoop("invokeAny");
871         return super.invokeAny(tasks, timeout, unit);
872     }
873 
874     @Override
875     public <T> List<java.util.concurrent.Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
876             throws InterruptedException {
877         throwIfInEventLoop("invokeAll");
878         return super.invokeAll(tasks);
879     }
880 
881     @Override
882     public <T> List<java.util.concurrent.Future<T>> invokeAll(
883             Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
884         throwIfInEventLoop("invokeAll");
885         return super.invokeAll(tasks, timeout, unit);
886     }
887 
888     private void throwIfInEventLoop(String method) {
889         if (inEventLoop()) {
890             throw new RejectedExecutionException("Calling " + method + " from within the EventLoop is not allowed");
891         }
892     }
893 
894     /**
895      * Returns the {@link ThreadProperties} of the {@link Thread} that powers the {@link SingleThreadEventExecutor}.
896      * If the {@link SingleThreadEventExecutor} is not started yet, this operation will start it and block until
897      * it is fully started.
898      */
899     public final ThreadProperties threadProperties() {
900         ThreadProperties threadProperties = this.threadProperties;
901         if (threadProperties == null) {
902             Thread thread = this.thread;
903             if (thread == null) {
904                 assert !inEventLoop();
905                 submit(NOOP_TASK).syncUninterruptibly();
906                 thread = this.thread;
907                 assert thread != null;
908             }
909 
910             threadProperties = new DefaultThreadProperties(thread);
911             if (!PROPERTIES_UPDATER.compareAndSet(this, null, threadProperties)) {
912                 threadProperties = this.threadProperties;
913             }
914         }
915 
916         return threadProperties;
917     }
918 
919     /**
920      * @deprecated override {@link SingleThreadEventExecutor#wakesUpForTask} to re-create this behaviour
921      */
922     @Deprecated
923     protected interface NonWakeupRunnable extends LazyRunnable { }
924 
925     /**
926      * Can be overridden to control which tasks require waking the {@link EventExecutor} thread
927      * if it is waiting so that they can be run immediately.
928      */
929     protected boolean wakesUpForTask(Runnable task) {
930         return true;
931     }
932 
933     protected static void reject() {
934         throw new RejectedExecutionException("event executor terminated");
935     }
936 
937     /**
938      * Offers the task to the associated {@link RejectedExecutionHandler}.
939      *
940      * @param task to reject.
941      */
942     protected final void reject(Runnable task) {
943         rejectedExecutionHandler.rejected(task, this);
944     }
945 
946     // ScheduledExecutorService implementation
947 
948     private static final long SCHEDULE_PURGE_INTERVAL = TimeUnit.SECONDS.toNanos(1);
949 
950     private void startThread() {
951         if (state == ST_NOT_STARTED) {
952             if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
953                 boolean success = false;
954                 try {
955                     doStartThread();
956                     success = true;
957                 } finally {
958                     if (!success) {
959                         STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
960                     }
961                 }
962             }
963         }
964     }
965 
966     private boolean ensureThreadStarted(int oldState) {
967         if (oldState == ST_NOT_STARTED) {
968             try {
969                 doStartThread();
970             } catch (Throwable cause) {
971                 STATE_UPDATER.set(this, ST_TERMINATED);
972                 terminationFuture.tryFailure(cause);
973 
974                 if (!(cause instanceof Exception)) {
975                     // Also rethrow as it may be an OOME for example
976                     PlatformDependent.throwException(cause);
977                 }
978                 return true;
979             }
980         }
981         return false;
982     }
983 
984     private void doStartThread() {
985         assert thread == null;
986         executor.execute(new Runnable() {
987             @Override
988             public void run() {
989                 thread = Thread.currentThread();
990                 if (interrupted) {
991                     thread.interrupt();
992                 }
993 
994                 boolean success = false;
995                 updateLastExecutionTime();
996                 try {
997                     SingleThreadEventExecutor.this.run();
998                     success = true;
999                 } catch (Throwable t) {
1000                     logger.warn("Unexpected exception from an event executor: ", t);
1001                 } finally {
1002                     for (;;) {
1003                         int oldState = state;
1004                         if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
1005                                 SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
1006                             break;
1007                         }
1008                     }
1009 
1010                     // Check if confirmShutdown() was called at the end of the loop.
1011                     if (success && gracefulShutdownStartTime == 0) {
1012                         if (logger.isErrorEnabled()) {
1013                             logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
1014                                     SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must " +
1015                                     "be called before run() implementation terminates.");
1016                         }
1017                     }
1018 
1019                     try {
1020                         // Run all remaining tasks and shutdown hooks. At this point the event loop
1021                         // is in ST_SHUTTING_DOWN state still accepting tasks which is needed for
1022                         // graceful shutdown with quietPeriod.
1023                         for (;;) {
1024                             if (confirmShutdown()) {
1025                                 break;
1026                             }
1027                         }
1028 
1029                         // Now we want to make sure no more tasks can be added from this point. This is
1030                         // achieved by switching the state. Any new tasks beyond this point will be rejected.
1031                         for (;;) {
1032                             int oldState = state;
1033                             if (oldState >= ST_SHUTDOWN || STATE_UPDATER.compareAndSet(
1034                                     SingleThreadEventExecutor.this, oldState, ST_SHUTDOWN)) {
1035                                 break;
1036                             }
1037                         }
1038 
1039                         // We have the final set of tasks in the queue now, no more can be added, run all remaining.
1040                         // No need to loop here, this is the final pass.
1041                         confirmShutdown();
1042                     } finally {
1043                         try {
1044                             cleanup();
1045                         } finally {
1046                             // Lets remove all FastThreadLocals for the Thread as we are about to terminate and notify
1047                             // the future. The user may block on the future and once it unblocks the JVM may terminate
1048                             // and start unloading classes.
1049                             // See https://github.com/netty/netty/issues/6596.
1050                             FastThreadLocal.removeAll();
1051 
1052                             STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
1053                             threadLock.countDown();
1054                             int numUserTasks = drainTasks();
1055                             if (numUserTasks > 0 && logger.isWarnEnabled()) {
1056                                 logger.warn("An event executor terminated with " +
1057                                         "non-empty task queue (" + numUserTasks + ')');
1058                             }
1059                             terminationFuture.setSuccess(null);
1060                         }
1061                     }
1062                 }
1063             }
1064         });
1065     }
1066 
1067     final int drainTasks() {
1068         int numTasks = 0;
1069         for (;;) {
1070             Runnable runnable = taskQueue.poll();
1071             if (runnable == null) {
1072                 break;
1073             }
1074             // WAKEUP_TASK should be just discarded as these are added internally.
1075             // The important bit is that we not have any user tasks left.
1076             if (WAKEUP_TASK != runnable) {
1077                 numTasks++;
1078             }
1079         }
1080         return numTasks;
1081     }
1082 
1083     private static final class DefaultThreadProperties implements ThreadProperties {
1084         private final Thread t;
1085 
1086         DefaultThreadProperties(Thread t) {
1087             this.t = t;
1088         }
1089 
1090         @Override
1091         public State state() {
1092             return t.getState();
1093         }
1094 
1095         @Override
1096         public int priority() {
1097             return t.getPriority();
1098         }
1099 
1100         @Override
1101         public boolean isInterrupted() {
1102             return t.isInterrupted();
1103         }
1104 
1105         @Override
1106         public boolean isDaemon() {
1107             return t.isDaemon();
1108         }
1109 
1110         @Override
1111         public String name() {
1112             return t.getName();
1113         }
1114 
1115         @Override
1116         public long id() {
1117             return t.getId();
1118         }
1119 
1120         @Override
1121         public StackTraceElement[] stackTrace() {
1122             return t.getStackTrace();
1123         }
1124 
1125         @Override
1126         public boolean isAlive() {
1127             return t.isAlive();
1128         }
1129     }
1130 }