View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty5.util.concurrent;
17  
18  import io.netty5.util.internal.SystemPropertyUtil;
19  import io.netty5.util.internal.ThreadExecutorMap;
20  import io.netty5.util.internal.logging.InternalLogger;
21  import io.netty5.util.internal.logging.InternalLoggerFactory;
22  import org.jetbrains.annotations.Async.Execute;
23  import org.jetbrains.annotations.Async.Schedule;
24  
25  import java.lang.Thread.State;
26  import java.util.ArrayList;
27  import java.util.LinkedHashSet;
28  import java.util.List;
29  import java.util.Queue;
30  import java.util.Set;
31  import java.util.concurrent.BlockingQueue;
32  import java.util.concurrent.CountDownLatch;
33  import java.util.concurrent.Executor;
34  import java.util.concurrent.LinkedBlockingQueue;
35  import java.util.concurrent.RejectedExecutionException;
36  import java.util.concurrent.ThreadFactory;
37  import java.util.concurrent.TimeUnit;
38  import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
39  import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
40  
41  import static java.util.Objects.requireNonNull;
42  
43  /**
44   * {@link OrderedEventExecutor}'s implementation that execute all its submitted tasks in a single thread.
45   *
46   */
47  public class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
48  
49      protected static final int DEFAULT_MAX_PENDING_EXECUTOR_TASKS = Math.max(16,
50              SystemPropertyUtil.getInt("io.netty5.eventexecutor.maxPendingTasks", Integer.MAX_VALUE));
51  
52      private static final InternalLogger logger =
53              InternalLoggerFactory.getInstance(SingleThreadEventExecutor.class);
54  
55      private static final int ST_NOT_STARTED = 1;
56      private static final int ST_STARTED = 2;
57      private static final int ST_SHUTTING_DOWN = 3;
58      private static final int ST_SHUTDOWN = 4;
59      private static final int ST_TERMINATED = 5;
60  
61      private static final Runnable WAKEUP_TASK = () -> {
62          // Do nothing.
63      };
64      private static final Runnable NOOP_TASK = () -> {
65          // Do nothing.
66      };
67  
68      private static final AtomicIntegerFieldUpdater<SingleThreadEventExecutor> STATE_UPDATER =
69              AtomicIntegerFieldUpdater.newUpdater(SingleThreadEventExecutor.class, "state");
70      private static final AtomicReferenceFieldUpdater<SingleThreadEventExecutor, ThreadProperties> PROPERTIES_UPDATER =
71              AtomicReferenceFieldUpdater.newUpdater(
72                      SingleThreadEventExecutor.class, ThreadProperties.class, "threadProperties");
73  
74      private final Queue<Runnable> taskQueue;
75  
76      private volatile Thread thread;
77      @SuppressWarnings("unused")
78      private volatile ThreadProperties threadProperties;
79      private final Executor executor;
80      private volatile boolean interrupted;
81  
82      private final CountDownLatch threadLock = new CountDownLatch(1);
83      private final Set<Runnable> shutdownHooks = new LinkedHashSet<>();
84      private final boolean addTaskWakesUp;
85      private final RejectedExecutionHandler rejectedExecutionHandler;
86  
87      private long lastExecutionTime;
88  
89      @SuppressWarnings({ "FieldMayBeFinal", "unused" })
90      private volatile int state = ST_NOT_STARTED;
91  
92      private volatile long gracefulShutdownQuietPeriod;
93      private volatile long gracefulShutdownTimeout;
94      private long gracefulShutdownStartTime;
95  
96      private final Promise<Void> terminationFuture = new DefaultPromise<>(GlobalEventExecutor.INSTANCE);
97  
98      /**
99       * Create a new instance
100      */
101     public SingleThreadEventExecutor() {
102         this(new DefaultThreadFactory(SingleThreadEventExecutor.class));
103     }
104 
105     /**
106      * Create a new instance
107      *
108      * @param threadFactory     the {@link ThreadFactory} which will be used for the used {@link Thread}
109      */
110     public SingleThreadEventExecutor(ThreadFactory threadFactory) {
111         this(new ThreadPerTaskExecutor(threadFactory));
112     }
113 
114     /**
115      * Create a new instance
116      *
117      * @param threadFactory     the {@link ThreadFactory} which will be used for the used {@link Thread}
118      * @param maxPendingTasks   the maximum number of pending tasks before new tasks will be rejected.
119      * @param rejectedHandler   the {@link RejectedExecutionHandler} to use.
120      */
121     public SingleThreadEventExecutor(ThreadFactory threadFactory,
122             int maxPendingTasks, RejectedExecutionHandler rejectedHandler) {
123         this(new ThreadPerTaskExecutor(threadFactory), maxPendingTasks, rejectedHandler);
124     }
125 
126     /**
127      * Create a new instance
128      *
129      * @param executor          the {@link Executor} which will be used for executing
130      */
131     public SingleThreadEventExecutor(Executor executor) {
132         this(executor, DEFAULT_MAX_PENDING_EXECUTOR_TASKS, RejectedExecutionHandlers.reject());
133     }
134 
135     /**
136      * Create a new instance
137      *
138      * @param executor          the {@link Executor} which will be used for executing
139      * @param maxPendingTasks   the maximum number of pending tasks before new tasks will be rejected.
140      * @param rejectedHandler   the {@link RejectedExecutionHandler} to use.
141      */
142     public SingleThreadEventExecutor(Executor executor, int maxPendingTasks, RejectedExecutionHandler rejectedHandler) {
143         this.executor = ThreadExecutorMap.apply(executor, this);
144         taskQueue = newTaskQueue(Math.max(16, maxPendingTasks));
145         addTaskWakesUp = taskQueue instanceof BlockingQueue;
146         rejectedExecutionHandler = requireNonNull(rejectedHandler, "rejectedHandler");
147     }
148 
149     /**
150      * Create a new {@link Queue} which will holds the tasks to execute. This default implementation will return a
151      * {@link LinkedBlockingQueue} but if your sub-class of {@link SingleThreadEventExecutor} will not do any blocking
152      * calls on the this {@link Queue} it may make sense to {@code @Override} this and return some more performant
153      * implementation that does not support blocking operations at all.
154      *
155      * Be aware that the implementation of {@link #run()} depends on a {@link BlockingQueue} so you will need to
156      * override {@link #run()} as well if you return a non {@link BlockingQueue} from this method.
157      *
158      * As this method is called from within the constructor you can only use the parameters passed into the method when
159      * overriding this method.
160      */
161     protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
162         return new LinkedBlockingQueue<>(maxPendingTasks);
163     }
164 
165     /**
166      * Interrupt the current running {@link Thread}.
167      */
168     protected final void interruptThread() {
169         Thread currentThread = thread;
170         if (currentThread == null) {
171             interrupted = true;
172         } else {
173             currentThread.interrupt();
174         }
175     }
176 
177     /**
178      * @see Queue#poll()
179      *
180      * This method must be called from the {@link EventExecutor} thread.
181      */
182     protected final Runnable pollTask() {
183         assert inEventLoop();
184 
185         for (;;) {
186             Runnable task = taskQueue.poll();
187             if (task == WAKEUP_TASK) {
188                 continue;
189             }
190             return task;
191         }
192     }
193 
194     /**
195      * Take the next {@link Runnable} from the task queue and so will block if no task is currently present.
196      * <p>
197      * Be aware that this method will throw an {@link UnsupportedOperationException} if the task queue, which was
198      * created via {@link #newTaskQueue(int)}, does not implement {@link BlockingQueue}.
199      * </p>
200      *
201      * This method must be called from the {@link EventExecutor} thread.
202      *
203      * @return {@code null} if the executor thread has been interrupted or waken up.
204      */
205     protected final Runnable takeTask() {
206         assert inEventLoop();
207         if (!(taskQueue instanceof BlockingQueue)) {
208             throw new UnsupportedOperationException();
209         }
210 
211         BlockingQueue<Runnable> taskQueue = (BlockingQueue<Runnable>) this.taskQueue;
212         for (;;) {
213             RunnableScheduledFuture<?> scheduledTask = peekScheduledTask();
214             if (scheduledTask == null) {
215                 Runnable task = null;
216                 try {
217                     task = taskQueue.take();
218                     if (task == WAKEUP_TASK) {
219                         task = null;
220                     }
221                 } catch (InterruptedException e) {
222                     // Ignore
223                 }
224                 return task;
225             } else {
226                 long delayNanos = scheduledTask.delayNanos();
227                 Runnable task = null;
228                 if (delayNanos > 0) {
229                     try {
230                         task = taskQueue.poll(delayNanos, TimeUnit.NANOSECONDS);
231                     } catch (InterruptedException e) {
232                         // Waken up.
233                         return null;
234                     }
235                 }
236                 if (task == null) {
237                     // We need to fetch the scheduled tasks now as otherwise there may be a chance that
238                     // scheduled tasks are never executed if there is always one task in the taskQueue.
239                     // This is for example true for the read task of OIO Transport
240                     // See https://github.com/netty/netty/issues/1614
241                     fetchFromScheduledTaskQueue();
242                     task = taskQueue.poll();
243                 }
244 
245                 if (task != null) {
246                     return task;
247                 }
248             }
249         }
250     }
251 
252     private boolean fetchFromScheduledTaskQueue() {
253         long nanoTime = getCurrentTimeNanos();
254         RunnableScheduledFuture<?> scheduledTask  = pollScheduledTask(nanoTime);
255         while (scheduledTask != null) {
256             if (!taskQueue.offer(scheduledTask)) {
257                 // No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again.
258                 schedule(scheduledTask);
259                 return false;
260             }
261             scheduledTask  = pollScheduledTask(nanoTime);
262         }
263         return true;
264     }
265 
266     /**
267      * @see Queue#isEmpty()
268      */
269     protected final boolean hasTasks() {
270         return !taskQueue.isEmpty();
271     }
272 
273     /**
274      * Return the number of tasks that are pending for processing (excluding the scheduled tasks).
275      */
276     public final int pendingTasks() {
277         return taskQueue.size();
278     }
279 
280     /**
281      * Add a task to the task queue, or throws a {@link RejectedExecutionException} if this instance was shutdown
282      * before.
283      */
284     private void addTask(Runnable task) {
285         if (!offerTask(task)) {
286             rejectedExecutionHandler.rejected(task, this);
287         }
288     }
289 
290     /**
291      * @see Queue#offer(Object)
292      */
293     protected final boolean offerTask(Runnable task) {
294         requireNonNull(task, "task");
295         if (isShutdown()) {
296             reject();
297         }
298         return taskQueue.offer(task);
299     }
300 
301     /**
302      * @see Queue#remove(Object)
303      */
304     protected final boolean removeTask(Runnable task) {
305         return taskQueue.remove(task);
306     }
307 
308     /**
309      * Poll all tasks from the task queue and run them via {@link Runnable#run()} method.
310      *
311      * This method must be called from the {@link EventExecutor} thread.
312      *
313      * @return {@code true} if and only if at least one task was run
314      */
315     private boolean runAllTasks() {
316         boolean fetchedAll;
317         do {
318             fetchedAll = fetchFromScheduledTaskQueue();
319             Runnable task = pollTask();
320             if (task == null) {
321                 return false;
322             }
323 
324             do {
325                 try {
326                     runTask(task);
327                 } catch (Throwable t) {
328                     logger.warn("A task raised an exception.", t);
329                 }
330             } while ((task = pollTask()) != null);
331         } while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.
332 
333         updateLastExecutionTime();
334         return true;
335     }
336 
337     private void runTask(@Execute Runnable task) {
338         task.run();
339     }
340 
341     /**
342      * Poll all tasks from the task queue and run them via {@link Runnable#run()} method.
343      *
344      * This method must be called from the {@link EventExecutor} thread.
345      *
346      * @return the number of processed tasks.
347      */
348     protected int runAllTasks(int maxTasks) {
349         assert inEventLoop();
350         boolean fetchedAll;
351         int processedTasks = 0;
352         do {
353             fetchedAll = fetchFromScheduledTaskQueue();
354             for (; processedTasks < maxTasks; processedTasks++) {
355                 Runnable task = pollTask();
356                 if (task == null) {
357                     break;
358                 }
359 
360                 try {
361                     runTask(task);
362                 } catch (Throwable t) {
363                     logger.warn("A task raised an exception.", t);
364                 }
365             }
366         } while (!fetchedAll && processedTasks < maxTasks); // keep on processing until we fetched all scheduled tasks.
367 
368         if (processedTasks > 0) {
369             // Only call if we at least executed one task.
370             updateLastExecutionTime();
371         }
372         return processedTasks;
373     }
374 
375     /**
376      * Returns the amount of time left until the scheduled task with the closest dead line is executed.
377      *
378      * This method must be called from the {@link EventExecutor} thread.
379      */
380     protected final long delayNanos(long currentTimeNanos) {
381         assert inEventLoop();
382         currentTimeNanos -= START_TIME;
383         RunnableScheduledFuture<?> scheduledTask = peekScheduledTask();
384         if (scheduledTask == null) {
385             return SCHEDULE_PURGE_INTERVAL;
386         }
387 
388         return scheduledTask.delayNanos(currentTimeNanos);
389     }
390 
391     /**
392      * Returns the absolute point in time (relative to {@link #getCurrentTimeNanos()} ()}) at which the next
393      * closest scheduled task should run or {@code -1} if none is scheduled at the mment.
394      *
395      * This method must be called from the {@link EventExecutor} thread.
396      */
397     protected final long deadlineNanos() {
398         assert inEventLoop();
399         RunnableScheduledFuture<?> scheduledTask = peekScheduledTask();
400         return scheduledTask == null ? -1 : scheduledTask.deadlineNanos();
401     }
402 
403     /**
404      * Updates the internal timestamp that tells when a submitted task was executed most recently.
405      * {@link #runAllTasks(int)} updates this timestamp automatically, and thus there's usually no need to call this
406      * method.  However, if you take the tasks manually using {@link #takeTask()} or {@link #pollTask()}, you have to
407      * call this method at the end of task execution loop if you execute a task for accurate quiet period checks.
408      *
409      * This method must be called from the {@link EventExecutor} thread.
410      */
411     protected final void updateLastExecutionTime() {
412         assert inEventLoop();
413         lastExecutionTime = getCurrentTimeNanos();
414     }
415 
416     /**
417      * Run tasks that are submitted to this {@link SingleThreadEventExecutor}.
418      * The implementation depends on the fact that {@link #newTaskQueue(int)} returns a
419      * {@link BlockingQueue}. If you change this by overriding {@link #newTaskQueue(int)}
420      * be aware that you also need to override {@link #run()}.
421      *
422      * This method must be called from the {@link EventExecutor} thread.
423      */
424     protected void run() {
425         assert inEventLoop();
426         do {
427             Runnable task = takeTask();
428             if (task != null) {
429                 runTask(task);
430                 updateLastExecutionTime();
431             }
432         } while (!confirmShutdown());
433     }
434 
435     /**
436      * Do nothing, sub-classes may override.
437      */
438     protected void cleanup() {
439         // NOOP
440         assert inEventLoop();
441     }
442 
443     protected void wakeup(boolean inEventLoop) {
444         if (!inEventLoop) {
445             // Use offer as we actually only need this to unblock the thread and if offer fails we do not care as there
446             // is already something in the queue.
447             taskQueue.offer(WAKEUP_TASK);
448         }
449     }
450 
451     @Override
452     public final boolean inEventLoop(Thread thread) {
453         return thread == this.thread;
454     }
455 
456     /**
457      * Add a {@link Runnable} which will be executed on shutdown of this instance
458      */
459     public final void addShutdownHook(final Runnable task) {
460         if (inEventLoop()) {
461             shutdownHooks.add(task);
462         } else {
463             execute(() -> shutdownHooks.add(task));
464         }
465     }
466 
467     /**
468      * Remove a previous added {@link Runnable} as a shutdown hook
469      */
470     public final void removeShutdownHook(final Runnable task) {
471         if (inEventLoop()) {
472             shutdownHooks.remove(task);
473         } else {
474             execute(() -> shutdownHooks.remove(task));
475         }
476     }
477 
478     private boolean runShutdownHooks() {
479         boolean ran = false;
480         // Note shutdown hooks can add / remove shutdown hooks.
481         while (!shutdownHooks.isEmpty()) {
482             List<Runnable> copy = new ArrayList<>(shutdownHooks);
483             shutdownHooks.clear();
484             for (Runnable task: copy) {
485                 try {
486                     runTask(task);
487                 } catch (Throwable t) {
488                     logger.warn("Shutdown hook raised an exception.", t);
489                 } finally {
490                     ran = true;
491                 }
492             }
493         }
494 
495         if (ran) {
496             updateLastExecutionTime();
497         }
498 
499         return ran;
500     }
501 
502     @Override
503     public final Future<Void> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
504         if (quietPeriod < 0) {
505             throw new IllegalArgumentException("quietPeriod: " + quietPeriod + " (expected >= 0)");
506         }
507         if (timeout < quietPeriod) {
508             throw new IllegalArgumentException(
509                     "timeout: " + timeout + " (expected >= quietPeriod (" + quietPeriod + "))");
510         }
511         requireNonNull(unit, "unit");
512 
513         if (isShuttingDown()) {
514             return terminationFuture();
515         }
516 
517         boolean inEventLoop = inEventLoop();
518         boolean wakeup;
519         int oldState;
520         for (;;) {
521             if (isShuttingDown()) {
522                 return terminationFuture();
523             }
524             int newState;
525             wakeup = true;
526             oldState = state;
527             if (inEventLoop) {
528                 newState = ST_SHUTTING_DOWN;
529             } else {
530                 switch (oldState) {
531                     case ST_NOT_STARTED:
532                     case ST_STARTED:
533                         newState = ST_SHUTTING_DOWN;
534                         break;
535                     default:
536                         newState = oldState;
537                         wakeup = false;
538                 }
539             }
540             if (STATE_UPDATER.compareAndSet(this, oldState, newState)) {
541                 break;
542             }
543         }
544         gracefulShutdownQuietPeriod = unit.toNanos(quietPeriod);
545         gracefulShutdownTimeout = unit.toNanos(timeout);
546 
547         if (ensureThreadStarted(oldState)) {
548             return terminationFuture.asFuture();
549         }
550 
551         if (wakeup) {
552             taskQueue.offer(WAKEUP_TASK);
553             if (!addTaskWakesUp) {
554                 wakeup(inEventLoop);
555             }
556         }
557 
558         return terminationFuture();
559     }
560 
561     @Override
562     public final Future<Void> terminationFuture() {
563         return terminationFuture.asFuture();
564     }
565 
566     @Override
567     public final boolean isShuttingDown() {
568         return state >= ST_SHUTTING_DOWN;
569     }
570 
571     @Override
572     public final boolean isShutdown() {
573         return state >= ST_SHUTDOWN;
574     }
575 
576     @Override
577     public final boolean isTerminated() {
578         return state == ST_TERMINATED;
579     }
580 
581     /**
582      * Confirm that the shutdown if the instance should be done now!
583      *
584      * This method must be called from the {@link EventExecutor} thread.
585      */
586     protected final boolean confirmShutdown() {
587         return confirmShutdown0();
588     }
589 
590     boolean confirmShutdown0() {
591         assert inEventLoop();
592 
593         if (!isShuttingDown()) {
594             return false;
595         }
596 
597         cancelScheduledTasks();
598 
599         if (gracefulShutdownStartTime == 0) {
600             gracefulShutdownStartTime = getCurrentTimeNanos();
601         }
602 
603         if (runAllTasks() || runShutdownHooks()) {
604             if (isShutdown()) {
605                 // Executor shut down - no new tasks anymore.
606                 return true;
607             }
608 
609             // There were tasks in the queue. Wait a little bit more until no tasks are queued for the quiet period or
610             // terminate if the quiet period is 0.
611             // See https://github.com/netty/netty/issues/4241
612             if (gracefulShutdownQuietPeriod == 0) {
613                 return true;
614             }
615             taskQueue.offer(WAKEUP_TASK);
616             return false;
617         }
618 
619         final long nanoTime = getCurrentTimeNanos();
620 
621         if (isShutdown() || nanoTime - gracefulShutdownStartTime > gracefulShutdownTimeout) {
622             return true;
623         }
624 
625         if (nanoTime - lastExecutionTime <= gracefulShutdownQuietPeriod) {
626             // Check if any tasks were added to the queue every 100ms.
627             // TODO: Change the behavior of takeTask() so that it returns on timeout.
628             taskQueue.offer(WAKEUP_TASK);
629             try {
630                 Thread.sleep(100);
631             } catch (InterruptedException e) {
632                 // Ignore
633             }
634 
635             return false;
636         }
637 
638         // No tasks were added for last quiet period - hopefully safe to shut down.
639         // (Hopefully because we really cannot make a guarantee that there will be no execute() calls by a user.)
640         return true;
641     }
642 
643     @Override
644     public final boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
645         requireNonNull(unit, "unit");
646 
647         if (inEventLoop()) {
648             throw new IllegalStateException("cannot await termination of the current thread");
649         }
650 
651         threadLock.await(timeout, unit);
652 
653         return isTerminated();
654     }
655 
656     @Override
657     public void execute(@Schedule Runnable task) {
658         requireNonNull(task, "task");
659 
660         boolean inEventLoop = inEventLoop();
661         addTask(task);
662         if (!inEventLoop) {
663             startThread();
664             if (isShutdown()) {
665                 boolean reject = false;
666                 try {
667                     if (removeTask(task)) {
668                         reject = true;
669                     }
670                 } catch (UnsupportedOperationException e) {
671                     // The task queue does not support removal so the best thing we can do is to just move on and
672                     // hope we will be able to pick-up the task before its completely terminated.
673                     // In worst case we will log on termination.
674                 }
675                 if (reject) {
676                     reject();
677                 }
678             }
679         }
680 
681         if (!addTaskWakesUp && wakesUpForTask(task)) {
682             wakeup(inEventLoop);
683         }
684     }
685 
686     /**
687      * Returns the {@link ThreadProperties} of the {@link Thread} that powers the {@link SingleThreadEventExecutor}.
688      * If the {@link SingleThreadEventExecutor} is not started yet, this operation will start it and block until
689      * it is fully started.
690      *
691      * @throws InterruptedException if this thread is interrupted while waiting for the thread to start.
692      */
693     public final ThreadProperties threadProperties() throws InterruptedException {
694         ThreadProperties threadProperties = this.threadProperties;
695         if (threadProperties == null) {
696             Thread thread = this.thread;
697             if (thread == null) {
698                 assert !inEventLoop();
699                 submit(NOOP_TASK).asStage().sync();
700                 thread = this.thread;
701                 assert thread != null;
702             }
703 
704             threadProperties = new DefaultThreadProperties(thread);
705             if (!PROPERTIES_UPDATER.compareAndSet(this, null, threadProperties)) {
706                 threadProperties = this.threadProperties;
707             }
708         }
709 
710         return threadProperties;
711     }
712 
713     /**
714      * Returns {@code true} if {@link #wakeup(boolean)} should be called for this {@link Runnable}, {@code false}
715      * otherwise.
716      */
717     protected boolean wakesUpForTask(@SuppressWarnings("unused") Runnable task) {
718         return true;
719     }
720 
721     protected static void reject() {
722         throw new RejectedExecutionException("event executor terminated");
723     }
724 
725     // ScheduledExecutorService implementation
726 
727     private static final long SCHEDULE_PURGE_INTERVAL = TimeUnit.SECONDS.toNanos(1);
728 
729     private void startThread() {
730         if (state == ST_NOT_STARTED) {
731             if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
732                 boolean success = false;
733                 try {
734                     doStartThread();
735                     success = true;
736                 } finally {
737                     if (!success) {
738                         STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
739                     }
740                 }
741             }
742         }
743     }
744 
745     private boolean ensureThreadStarted(int oldState) {
746         if (oldState == ST_NOT_STARTED) {
747             try {
748                 doStartThread();
749             } catch (Throwable cause) {
750                 STATE_UPDATER.set(this, ST_TERMINATED);
751                 terminationFuture.tryFailure(cause);
752 
753                 if (cause instanceof Error) {
754                     // Rethrow errors so they can't be ignored.
755                     throw cause;
756                 }
757                 return true;
758             }
759         }
760         return false;
761     }
762 
763     private void doStartThread() {
764         assert thread == null;
765         executor.execute(() -> {
766             thread = Thread.currentThread();
767             if (interrupted) {
768                 thread.interrupt();
769             }
770 
771             boolean success = false;
772             updateLastExecutionTime();
773             try {
774                 run();
775                 success = true;
776             } catch (Throwable t) {
777                 logger.warn("Unexpected exception from an event executor: ", t);
778             } finally {
779                 for (;;) {
780                     int oldState = state;
781                     if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
782                             this, oldState, ST_SHUTTING_DOWN)) {
783                         break;
784                     }
785                 }
786 
787                 // Check if confirmShutdown() was called at the end of the loop.
788                 if (success && gracefulShutdownStartTime == 0) {
789                     if (logger.isErrorEnabled()) {
790                         logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
791                                 SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must " +
792                                 "be called before run() implementation terminates.");
793                     }
794                 }
795 
796                 try {
797                     // Run all remaining tasks and shutdown hooks. At this point the event loop
798                     // is in ST_SHUTTING_DOWN state still accepting tasks which is needed for
799                     // graceful shutdown with quietPeriod.
800                     for (;;) {
801                         if (confirmShutdown()) {
802                             break;
803                         }
804                     }
805 
806                     // Now we want to make sure no more tasks can be added from this point. This is
807                     // achieved by switching the state. Any new tasks beyond this point will be rejected.
808                     for (;;) {
809                         int oldState = state;
810                         if (oldState >= ST_SHUTDOWN || STATE_UPDATER.compareAndSet(
811                                 this, oldState, ST_SHUTDOWN)) {
812                             break;
813                         }
814                     }
815 
816                     // We have the final set of tasks in the queue now, no more can be added, run all remaining.
817                     // No need to loop here, this is the final pass.
818                     confirmShutdown();
819                 } finally {
820                     try {
821                         cleanup();
822                     } finally {
823                         // Lets remove all FastThreadLocals for the Thread as we are about to terminate and notify
824                         // the future. The user may block on the future and once it unblocks the JVM may terminate
825                         // and start unloading classes.
826                         // See https://github.com/netty/netty/issues/6596.
827                         FastThreadLocal.removeAll();
828 
829                         STATE_UPDATER.set(this, ST_TERMINATED);
830                         threadLock.countDown();
831                         int numUserTasks = drainTasks();
832                         if (numUserTasks > 0 && logger.isWarnEnabled()) {
833                             logger.warn("An event executor terminated with " +
834                                     "non-empty task queue (" + numUserTasks + ')');
835                         }
836                         terminationFuture.setSuccess(null);
837                     }
838                 }
839             }
840         });
841     }
842 
843     final int drainTasks() {
844         int numTasks = 0;
845         for (;;) {
846             Runnable runnable = taskQueue.poll();
847             if (runnable == null) {
848                 break;
849             }
850             // WAKEUP_TASK should be just discarded as these are added internally.
851             // The important bit is that we not have any user tasks left.
852             if (WAKEUP_TASK != runnable) {
853                 numTasks++;
854             }
855         }
856         return numTasks;
857     }
858 
859     private static final class DefaultThreadProperties implements ThreadProperties {
860         private final Thread t;
861 
862         DefaultThreadProperties(Thread t) {
863             this.t = t;
864         }
865 
866         @Override
867         public State state() {
868             return t.getState();
869         }
870 
871         @Override
872         public int priority() {
873             return t.getPriority();
874         }
875 
876         @Override
877         public boolean isInterrupted() {
878             return t.isInterrupted();
879         }
880 
881         @Override
882         public boolean isDaemon() {
883             return t.isDaemon();
884         }
885 
886         @Override
887         public String name() {
888             return t.getName();
889         }
890 
891         @Override
892         public long id() {
893             return t.getId();
894         }
895 
896         @Override
897         public StackTraceElement[] stackTrace() {
898             return t.getStackTrace();
899         }
900 
901         @Override
902         public boolean isAlive() {
903             return t.isAlive();
904         }
905     }
906 }