View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.util.concurrent;
17  
18  import io.netty.util.internal.ObjectUtil;
19  import io.netty.util.internal.PlatformDependent;
20  import io.netty.util.internal.SystemPropertyUtil;
21  import io.netty.util.internal.UnstableApi;
22  import io.netty.util.internal.logging.InternalLogger;
23  import io.netty.util.internal.logging.InternalLoggerFactory;
24  
25  import java.lang.Thread.State;
26  import java.util.ArrayList;
27  import java.util.Collection;
28  import java.util.LinkedHashSet;
29  import java.util.List;
30  import java.util.Queue;
31  import java.util.Set;
32  import java.util.concurrent.BlockingQueue;
33  import java.util.concurrent.Callable;
34  import java.util.concurrent.ExecutionException;
35  import java.util.concurrent.Executor;
36  import java.util.concurrent.LinkedBlockingQueue;
37  import java.util.concurrent.RejectedExecutionException;
38  import java.util.concurrent.Semaphore;
39  import java.util.concurrent.ThreadFactory;
40  import java.util.concurrent.TimeUnit;
41  import java.util.concurrent.TimeoutException;
42  import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
43  import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
44  
45  /**
46   * Abstract base class for {@link OrderedEventExecutor}'s that execute all its submitted tasks in a single thread.
47   *
48   */
49  public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
50  
51      static final int DEFAULT_MAX_PENDING_EXECUTOR_TASKS = Math.max(16,
52              SystemPropertyUtil.getInt("io.netty.eventexecutor.maxPendingTasks", Integer.MAX_VALUE));
53  
54      private static final InternalLogger logger =
55              InternalLoggerFactory.getInstance(SingleThreadEventExecutor.class);
56  
57      private static final int ST_NOT_STARTED = 1;
58      private static final int ST_STARTED = 2;
59      private static final int ST_SHUTTING_DOWN = 3;
60      private static final int ST_SHUTDOWN = 4;
61      private static final int ST_TERMINATED = 5;
62  
63      private static final Runnable WAKEUP_TASK = new Runnable() {
64          @Override
65          public void run() {
66              // Do nothing.
67          }
68      };
69      private static final Runnable NOOP_TASK = new Runnable() {
70          @Override
71          public void run() {
72              // Do nothing.
73          }
74      };
75  
76      private static final AtomicIntegerFieldUpdater<SingleThreadEventExecutor> STATE_UPDATER =
77              AtomicIntegerFieldUpdater.newUpdater(SingleThreadEventExecutor.class, "state");
78      private static final AtomicReferenceFieldUpdater<SingleThreadEventExecutor, ThreadProperties> PROPERTIES_UPDATER =
79              AtomicReferenceFieldUpdater.newUpdater(
80                      SingleThreadEventExecutor.class, ThreadProperties.class, "threadProperties");
81  
82      private final Queue<Runnable> taskQueue;
83  
84      private volatile Thread thread;
85      @SuppressWarnings("unused")
86      private volatile ThreadProperties threadProperties;
87      private final Executor executor;
88      private volatile boolean interrupted;
89  
90      private final Semaphore threadLock = new Semaphore(0);
91      private final Set<Runnable> shutdownHooks = new LinkedHashSet<Runnable>();
92      private final boolean addTaskWakesUp;
93      private final int maxPendingTasks;
94      private final RejectedExecutionHandler rejectedExecutionHandler;
95  
96      private long lastExecutionTime;
97  
98      @SuppressWarnings({ "FieldMayBeFinal", "unused" })
99      private volatile int state = ST_NOT_STARTED;
100 
101     private volatile long gracefulShutdownQuietPeriod;
102     private volatile long gracefulShutdownTimeout;
103     private long gracefulShutdownStartTime;
104 
105     private final Promise<?> terminationFuture = new DefaultPromise<Void>(GlobalEventExecutor.INSTANCE);
106 
107     /**
108      * Create a new instance
109      *
110      * @param parent            the {@link EventExecutorGroup} which is the parent of this instance and belongs to it
111      * @param threadFactory     the {@link ThreadFactory} which will be used for the used {@link Thread}
112      * @param addTaskWakesUp    {@code true} if and only if invocation of {@link #addTask(Runnable)} will wake up the
113      *                          executor thread
114      */
115     protected SingleThreadEventExecutor(
116             EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
117         this(parent, new ThreadPerTaskExecutor(threadFactory), addTaskWakesUp);
118     }
119 
120     /**
121      * Create a new instance
122      *
123      * @param parent            the {@link EventExecutorGroup} which is the parent of this instance and belongs to it
124      * @param threadFactory     the {@link ThreadFactory} which will be used for the used {@link Thread}
125      * @param addTaskWakesUp    {@code true} if and only if invocation of {@link #addTask(Runnable)} will wake up the
126      *                          executor thread
127      * @param maxPendingTasks   the maximum number of pending tasks before new tasks will be rejected.
128      * @param rejectedHandler   the {@link RejectedExecutionHandler} to use.
129      */
130     protected SingleThreadEventExecutor(
131             EventExecutorGroup parent, ThreadFactory threadFactory,
132             boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedHandler) {
133         this(parent, new ThreadPerTaskExecutor(threadFactory), addTaskWakesUp, maxPendingTasks, rejectedHandler);
134     }
135 
136     /**
137      * Create a new instance
138      *
139      * @param parent            the {@link EventExecutorGroup} which is the parent of this instance and belongs to it
140      * @param executor          the {@link Executor} which will be used for executing
141      * @param addTaskWakesUp    {@code true} if and only if invocation of {@link #addTask(Runnable)} will wake up the
142      *                          executor thread
143      */
144     protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp) {
145         this(parent, executor, addTaskWakesUp, DEFAULT_MAX_PENDING_EXECUTOR_TASKS, RejectedExecutionHandlers.reject());
146     }
147 
148     /**
149      * Create a new instance
150      *
151      * @param parent            the {@link EventExecutorGroup} which is the parent of this instance and belongs to it
152      * @param executor          the {@link Executor} which will be used for executing
153      * @param addTaskWakesUp    {@code true} if and only if invocation of {@link #addTask(Runnable)} will wake up the
154      *                          executor thread
155      * @param maxPendingTasks   the maximum number of pending tasks before new tasks will be rejected.
156      * @param rejectedHandler   the {@link RejectedExecutionHandler} to use.
157      */
158     protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
159                                         boolean addTaskWakesUp, int maxPendingTasks,
160                                         RejectedExecutionHandler rejectedHandler) {
161         super(parent);
162         this.addTaskWakesUp = addTaskWakesUp;
163         this.maxPendingTasks = Math.max(16, maxPendingTasks);
164         this.executor = ObjectUtil.checkNotNull(executor, "executor");
165         taskQueue = newTaskQueue(this.maxPendingTasks);
166         rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
167     }
168 
169     /**
170      * @deprecated Please use and override {@link #newTaskQueue(int)}.
171      */
172     @Deprecated
173     protected Queue<Runnable> newTaskQueue() {
174         return newTaskQueue(maxPendingTasks);
175     }
176 
177     /**
178      * Create a new {@link Queue} which will holds the tasks to execute. This default implementation will return a
179      * {@link LinkedBlockingQueue} but if your sub-class of {@link SingleThreadEventExecutor} will not do any blocking
180      * calls on the this {@link Queue} it may make sense to {@code @Override} this and return some more performant
181      * implementation that does not support blocking operations at all.
182      */
183     protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
184         return new LinkedBlockingQueue<Runnable>(maxPendingTasks);
185     }
186 
187     /**
188      * Interrupt the current running {@link Thread}.
189      */
190     protected void interruptThread() {
191         Thread currentThread = thread;
192         if (currentThread == null) {
193             interrupted = true;
194         } else {
195             currentThread.interrupt();
196         }
197     }
198 
199     /**
200      * @see Queue#poll()
201      */
202     protected Runnable pollTask() {
203         assert inEventLoop();
204         return pollTaskFrom(taskQueue);
205     }
206 
207     protected static Runnable pollTaskFrom(Queue<Runnable> taskQueue) {
208         for (;;) {
209             Runnable task = taskQueue.poll();
210             if (task == WAKEUP_TASK) {
211                 continue;
212             }
213             return task;
214         }
215     }
216 
217     /**
218      * Take the next {@link Runnable} from the task queue and so will block if no task is currently present.
219      * <p>
220      * Be aware that this method will throw an {@link UnsupportedOperationException} if the task queue, which was
221      * created via {@link #newTaskQueue()}, does not implement {@link BlockingQueue}.
222      * </p>
223      *
224      * @return {@code null} if the executor thread has been interrupted or waken up.
225      */
226     protected Runnable takeTask() {
227         assert inEventLoop();
228         if (!(taskQueue instanceof BlockingQueue)) {
229             throw new UnsupportedOperationException();
230         }
231 
232         BlockingQueue<Runnable> taskQueue = (BlockingQueue<Runnable>) this.taskQueue;
233         for (;;) {
234             ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
235             if (scheduledTask == null) {
236                 Runnable task = null;
237                 try {
238                     task = taskQueue.take();
239                     if (task == WAKEUP_TASK) {
240                         task = null;
241                     }
242                 } catch (InterruptedException e) {
243                     // Ignore
244                 }
245                 return task;
246             } else {
247                 long delayNanos = scheduledTask.delayNanos();
248                 Runnable task = null;
249                 if (delayNanos > 0) {
250                     try {
251                         task = taskQueue.poll(delayNanos, TimeUnit.NANOSECONDS);
252                     } catch (InterruptedException e) {
253                         // Waken up.
254                         return null;
255                     }
256                 }
257                 if (task == null) {
258                     // We need to fetch the scheduled tasks now as otherwise there may be a chance that
259                     // scheduled tasks are never executed if there is always one task in the taskQueue.
260                     // This is for example true for the read task of OIO Transport
261                     // See https://github.com/netty/netty/issues/1614
262                     fetchFromScheduledTaskQueue();
263                     task = taskQueue.poll();
264                 }
265 
266                 if (task != null) {
267                     return task;
268                 }
269             }
270         }
271     }
272 
273     private boolean fetchFromScheduledTaskQueue() {
274         long nanoTime = AbstractScheduledEventExecutor.nanoTime();
275         Runnable scheduledTask  = pollScheduledTask(nanoTime);
276         while (scheduledTask != null) {
277             if (!taskQueue.offer(scheduledTask)) {
278                 // No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again.
279                 scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);
280                 return false;
281             }
282             scheduledTask  = pollScheduledTask(nanoTime);
283         }
284         return true;
285     }
286 
287     /**
288      * @see Queue#peek()
289      */
290     protected Runnable peekTask() {
291         assert inEventLoop();
292         return taskQueue.peek();
293     }
294 
295     /**
296      * @see Queue#isEmpty()
297      */
298     protected boolean hasTasks() {
299         assert inEventLoop();
300         return !taskQueue.isEmpty();
301     }
302 
303     /**
304      * Return the number of tasks that are pending for processing.
305      *
306      * <strong>Be aware that this operation may be expensive as it depends on the internal implementation of the
307      * SingleThreadEventExecutor. So use it was care!</strong>
308      */
309     public int pendingTasks() {
310         return taskQueue.size();
311     }
312 
313     /**
314      * Add a task to the task queue, or throws a {@link RejectedExecutionException} if this instance was shutdown
315      * before.
316      */
317     protected void addTask(Runnable task) {
318         if (task == null) {
319             throw new NullPointerException("task");
320         }
321         if (!offerTask(task)) {
322             reject(task);
323         }
324     }
325 
326     final boolean offerTask(Runnable task) {
327         if (isShutdown()) {
328             reject();
329         }
330         return taskQueue.offer(task);
331     }
332 
333     /**
334      * @see Queue#remove(Object)
335      */
336     protected boolean removeTask(Runnable task) {
337         if (task == null) {
338             throw new NullPointerException("task");
339         }
340         return taskQueue.remove(task);
341     }
342 
343     /**
344      * Poll all tasks from the task queue and run them via {@link Runnable#run()} method.
345      *
346      * @return {@code true} if and only if at least one task was run
347      */
348     protected boolean runAllTasks() {
349         assert inEventLoop();
350         boolean fetchedAll;
351         boolean ranAtLeastOne = false;
352 
353         do {
354             fetchedAll = fetchFromScheduledTaskQueue();
355             if (runAllTasksFrom(taskQueue)) {
356                 ranAtLeastOne = true;
357             }
358         } while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.
359 
360         if (ranAtLeastOne) {
361             lastExecutionTime = ScheduledFutureTask.nanoTime();
362         }
363         afterRunningAllTasks();
364         return ranAtLeastOne;
365     }
366 
367     /**
368      * Runs all tasks from the passed {@code taskQueue}.
369      *
370      * @param taskQueue To poll and execute all tasks.
371      *
372      * @return {@code true} if at least one task was executed.
373      */
374     protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) {
375         Runnable task = pollTaskFrom(taskQueue);
376         if (task == null) {
377             return false;
378         }
379         for (;;) {
380             safeExecute(task);
381             task = pollTaskFrom(taskQueue);
382             if (task == null) {
383                 return true;
384             }
385         }
386     }
387 
388     /**
389      * Poll all tasks from the task queue and run them via {@link Runnable#run()} method.  This method stops running
390      * the tasks in the task queue and returns if it ran longer than {@code timeoutNanos}.
391      */
392     protected boolean runAllTasks(long timeoutNanos) {
393         fetchFromScheduledTaskQueue();
394         Runnable task = pollTask();
395         if (task == null) {
396             afterRunningAllTasks();
397             return false;
398         }
399 
400         final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
401         long runTasks = 0;
402         long lastExecutionTime;
403         for (;;) {
404             safeExecute(task);
405 
406             runTasks ++;
407 
408             // Check timeout every 64 tasks because nanoTime() is relatively expensive.
409             // XXX: Hard-coded value - will make it configurable if it is really a problem.
410             if ((runTasks & 0x3F) == 0) {
411                 lastExecutionTime = ScheduledFutureTask.nanoTime();
412                 if (lastExecutionTime >= deadline) {
413                     break;
414                 }
415             }
416 
417             task = pollTask();
418             if (task == null) {
419                 lastExecutionTime = ScheduledFutureTask.nanoTime();
420                 break;
421             }
422         }
423 
424         afterRunningAllTasks();
425         this.lastExecutionTime = lastExecutionTime;
426         return true;
427     }
428 
429     /**
430      * Invoked before returning from {@link #runAllTasks()} and {@link #runAllTasks(long)}.
431      */
432     @UnstableApi
433     protected void afterRunningAllTasks() { }
434     /**
435      * Returns the amount of time left until the scheduled task with the closest dead line is executed.
436      */
437     protected long delayNanos(long currentTimeNanos) {
438         ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
439         if (scheduledTask == null) {
440             return SCHEDULE_PURGE_INTERVAL;
441         }
442 
443         return scheduledTask.delayNanos(currentTimeNanos);
444     }
445 
446     /**
447      * Updates the internal timestamp that tells when a submitted task was executed most recently.
448      * {@link #runAllTasks()} and {@link #runAllTasks(long)} updates this timestamp automatically, and thus there's
449      * usually no need to call this method.  However, if you take the tasks manually using {@link #takeTask()} or
450      * {@link #pollTask()}, you have to call this method at the end of task execution loop for accurate quiet period
451      * checks.
452      */
453     protected void updateLastExecutionTime() {
454         lastExecutionTime = ScheduledFutureTask.nanoTime();
455     }
456 
457     /**
458      *
459      */
460     protected abstract void run();
461 
462     /**
463      * Do nothing, sub-classes may override
464      */
465     protected void cleanup() {
466         // NOOP
467     }
468 
469     protected void wakeup(boolean inEventLoop) {
470         if (!inEventLoop || state == ST_SHUTTING_DOWN) {
471             // Use offer as we actually only need this to unblock the thread and if offer fails we do not care as there
472             // is already something in the queue.
473             taskQueue.offer(WAKEUP_TASK);
474         }
475     }
476 
477     @Override
478     public boolean inEventLoop(Thread thread) {
479         return thread == this.thread;
480     }
481 
482     /**
483      * Add a {@link Runnable} which will be executed on shutdown of this instance
484      */
485     public void addShutdownHook(final Runnable task) {
486         if (inEventLoop()) {
487             shutdownHooks.add(task);
488         } else {
489             execute(new Runnable() {
490                 @Override
491                 public void run() {
492                     shutdownHooks.add(task);
493                 }
494             });
495         }
496     }
497 
498     /**
499      * Remove a previous added {@link Runnable} as a shutdown hook
500      */
501     public void removeShutdownHook(final Runnable task) {
502         if (inEventLoop()) {
503             shutdownHooks.remove(task);
504         } else {
505             execute(new Runnable() {
506                 @Override
507                 public void run() {
508                     shutdownHooks.remove(task);
509                 }
510             });
511         }
512     }
513 
514     private boolean runShutdownHooks() {
515         boolean ran = false;
516         // Note shutdown hooks can add / remove shutdown hooks.
517         while (!shutdownHooks.isEmpty()) {
518             List<Runnable> copy = new ArrayList<Runnable>(shutdownHooks);
519             shutdownHooks.clear();
520             for (Runnable task: copy) {
521                 try {
522                     task.run();
523                 } catch (Throwable t) {
524                     logger.warn("Shutdown hook raised an exception.", t);
525                 } finally {
526                     ran = true;
527                 }
528             }
529         }
530 
531         if (ran) {
532             lastExecutionTime = ScheduledFutureTask.nanoTime();
533         }
534 
535         return ran;
536     }
537 
538     @Override
539     public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
540         if (quietPeriod < 0) {
541             throw new IllegalArgumentException("quietPeriod: " + quietPeriod + " (expected >= 0)");
542         }
543         if (timeout < quietPeriod) {
544             throw new IllegalArgumentException(
545                     "timeout: " + timeout + " (expected >= quietPeriod (" + quietPeriod + "))");
546         }
547         if (unit == null) {
548             throw new NullPointerException("unit");
549         }
550 
551         if (isShuttingDown()) {
552             return terminationFuture();
553         }
554 
555         boolean inEventLoop = inEventLoop();
556         boolean wakeup;
557         int oldState;
558         for (;;) {
559             if (isShuttingDown()) {
560                 return terminationFuture();
561             }
562             int newState;
563             wakeup = true;
564             oldState = state;
565             if (inEventLoop) {
566                 newState = ST_SHUTTING_DOWN;
567             } else {
568                 switch (oldState) {
569                     case ST_NOT_STARTED:
570                     case ST_STARTED:
571                         newState = ST_SHUTTING_DOWN;
572                         break;
573                     default:
574                         newState = oldState;
575                         wakeup = false;
576                 }
577             }
578             if (STATE_UPDATER.compareAndSet(this, oldState, newState)) {
579                 break;
580             }
581         }
582         gracefulShutdownQuietPeriod = unit.toNanos(quietPeriod);
583         gracefulShutdownTimeout = unit.toNanos(timeout);
584 
585         if (oldState == ST_NOT_STARTED) {
586             try {
587                 doStartThread();
588             } catch (Throwable cause) {
589                 STATE_UPDATER.set(this, ST_TERMINATED);
590                 terminationFuture.tryFailure(cause);
591 
592                 if (!(cause instanceof Exception)) {
593                     // Also rethrow as it may be an OOME for example
594                     PlatformDependent.throwException(cause);
595                 }
596                 return terminationFuture;
597             }
598         }
599 
600         if (wakeup) {
601             wakeup(inEventLoop);
602         }
603 
604         return terminationFuture();
605     }
606 
607     @Override
608     public Future<?> terminationFuture() {
609         return terminationFuture;
610     }
611 
612     @Override
613     @Deprecated
614     public void shutdown() {
615         if (isShutdown()) {
616             return;
617         }
618 
619         boolean inEventLoop = inEventLoop();
620         boolean wakeup;
621         int oldState;
622         for (;;) {
623             if (isShuttingDown()) {
624                 return;
625             }
626             int newState;
627             wakeup = true;
628             oldState = state;
629             if (inEventLoop) {
630                 newState = ST_SHUTDOWN;
631             } else {
632                 switch (oldState) {
633                     case ST_NOT_STARTED:
634                     case ST_STARTED:
635                     case ST_SHUTTING_DOWN:
636                         newState = ST_SHUTDOWN;
637                         break;
638                     default:
639                         newState = oldState;
640                         wakeup = false;
641                 }
642             }
643             if (STATE_UPDATER.compareAndSet(this, oldState, newState)) {
644                 break;
645             }
646         }
647 
648         if (oldState == ST_NOT_STARTED) {
649             try {
650                 doStartThread();
651             } catch (Throwable cause) {
652                 STATE_UPDATER.set(this, ST_TERMINATED);
653                 terminationFuture.tryFailure(cause);
654 
655                 if (!(cause instanceof Exception)) {
656                     // Also rethrow as it may be an OOME for example
657                     PlatformDependent.throwException(cause);
658                 }
659                 return;
660             }
661         }
662 
663         if (wakeup) {
664             wakeup(inEventLoop);
665         }
666     }
667 
668     @Override
669     public boolean isShuttingDown() {
670         return state >= ST_SHUTTING_DOWN;
671     }
672 
673     @Override
674     public boolean isShutdown() {
675         return state >= ST_SHUTDOWN;
676     }
677 
678     @Override
679     public boolean isTerminated() {
680         return state == ST_TERMINATED;
681     }
682 
683     /**
684      * Confirm that the shutdown if the instance should be done now!
685      */
686     protected boolean confirmShutdown() {
687         if (!isShuttingDown()) {
688             return false;
689         }
690 
691         if (!inEventLoop()) {
692             throw new IllegalStateException("must be invoked from an event loop");
693         }
694 
695         cancelScheduledTasks();
696 
697         if (gracefulShutdownStartTime == 0) {
698             gracefulShutdownStartTime = ScheduledFutureTask.nanoTime();
699         }
700 
701         if (runAllTasks() || runShutdownHooks()) {
702             if (isShutdown()) {
703                 // Executor shut down - no new tasks anymore.
704                 return true;
705             }
706 
707             // There were tasks in the queue. Wait a little bit more until no tasks are queued for the quiet period or
708             // terminate if the quiet period is 0.
709             // See https://github.com/netty/netty/issues/4241
710             if (gracefulShutdownQuietPeriod == 0) {
711                 return true;
712             }
713             wakeup(true);
714             return false;
715         }
716 
717         final long nanoTime = ScheduledFutureTask.nanoTime();
718 
719         if (isShutdown() || nanoTime - gracefulShutdownStartTime > gracefulShutdownTimeout) {
720             return true;
721         }
722 
723         if (nanoTime - lastExecutionTime <= gracefulShutdownQuietPeriod) {
724             // Check if any tasks were added to the queue every 100ms.
725             // TODO: Change the behavior of takeTask() so that it returns on timeout.
726             wakeup(true);
727             try {
728                 Thread.sleep(100);
729             } catch (InterruptedException e) {
730                 // Ignore
731             }
732 
733             return false;
734         }
735 
736         // No tasks were added for last quiet period - hopefully safe to shut down.
737         // (Hopefully because we really cannot make a guarantee that there will be no execute() calls by a user.)
738         return true;
739     }
740 
741     @Override
742     public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
743         if (unit == null) {
744             throw new NullPointerException("unit");
745         }
746 
747         if (inEventLoop()) {
748             throw new IllegalStateException("cannot await termination of the current thread");
749         }
750 
751         if (threadLock.tryAcquire(timeout, unit)) {
752             threadLock.release();
753         }
754 
755         return isTerminated();
756     }
757 
758     @Override
759     public void execute(Runnable task) {
760         if (task == null) {
761             throw new NullPointerException("task");
762         }
763 
764         boolean inEventLoop = inEventLoop();
765         if (inEventLoop) {
766             addTask(task);
767         } else {
768             startThread();
769             addTask(task);
770             if (isShutdown() && removeTask(task)) {
771                 reject();
772             }
773         }
774 
775         if (!addTaskWakesUp && wakesUpForTask(task)) {
776             wakeup(inEventLoop);
777         }
778     }
779 
780     @Override
781     public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
782         throwIfInEventLoop("invokeAny");
783         return super.invokeAny(tasks);
784     }
785 
786     @Override
787     public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
788             throws InterruptedException, ExecutionException, TimeoutException {
789         throwIfInEventLoop("invokeAny");
790         return super.invokeAny(tasks, timeout, unit);
791     }
792 
793     @Override
794     public <T> List<java.util.concurrent.Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
795             throws InterruptedException {
796         throwIfInEventLoop("invokeAll");
797         return super.invokeAll(tasks);
798     }
799 
800     @Override
801     public <T> List<java.util.concurrent.Future<T>> invokeAll(
802             Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
803         throwIfInEventLoop("invokeAll");
804         return super.invokeAll(tasks, timeout, unit);
805     }
806 
807     private void throwIfInEventLoop(String method) {
808         if (inEventLoop()) {
809             throw new RejectedExecutionException("Calling " + method + " from within the EventLoop is not allowed");
810         }
811     }
812 
813     /**
814      * Returns the {@link ThreadProperties} of the {@link Thread} that powers the {@link SingleThreadEventExecutor}.
815      * If the {@link SingleThreadEventExecutor} is not started yet, this operation will start it and block until the
816      * it is fully started.
817      */
818     public final ThreadProperties threadProperties() {
819         ThreadProperties threadProperties = this.threadProperties;
820         if (threadProperties == null) {
821             Thread thread = this.thread;
822             if (thread == null) {
823                 assert !inEventLoop();
824                 submit(NOOP_TASK).syncUninterruptibly();
825                 thread = this.thread;
826                 assert thread != null;
827             }
828 
829             threadProperties = new DefaultThreadProperties(thread);
830             if (!PROPERTIES_UPDATER.compareAndSet(this, null, threadProperties)) {
831                 threadProperties = this.threadProperties;
832             }
833         }
834 
835         return threadProperties;
836     }
837 
838     @SuppressWarnings("unused")
839     protected boolean wakesUpForTask(Runnable task) {
840         return true;
841     }
842 
843     protected static void reject() {
844         throw new RejectedExecutionException("event executor terminated");
845     }
846 
847     /**
848      * Offers the task to the associated {@link RejectedExecutionHandler}.
849      *
850      * @param task to reject.
851      */
852     protected final void reject(Runnable task) {
853         rejectedExecutionHandler.rejected(task, this);
854     }
855 
856     // ScheduledExecutorService implementation
857 
858     private static final long SCHEDULE_PURGE_INTERVAL = TimeUnit.SECONDS.toNanos(1);
859 
860     private void startThread() {
861         if (state == ST_NOT_STARTED) {
862             if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
863                 try {
864                     doStartThread();
865                 } catch (Throwable cause) {
866                     STATE_UPDATER.set(this, ST_NOT_STARTED);
867                     PlatformDependent.throwException(cause);
868                 }
869             }
870         }
871     }
872 
873     private void doStartThread() {
874         assert thread == null;
875         executor.execute(new Runnable() {
876             @Override
877             public void run() {
878                 thread = Thread.currentThread();
879                 if (interrupted) {
880                     thread.interrupt();
881                 }
882 
883                 boolean success = false;
884                 updateLastExecutionTime();
885                 try {
886                     SingleThreadEventExecutor.this.run();
887                     success = true;
888                 } catch (Throwable t) {
889                     logger.warn("Unexpected exception from an event executor: ", t);
890                 } finally {
891                     for (;;) {
892                         int oldState = state;
893                         if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
894                                 SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
895                             break;
896                         }
897                     }
898 
899                     // Check if confirmShutdown() was called at the end of the loop.
900                     if (success && gracefulShutdownStartTime == 0) {
901                         logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
902                                 SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +
903                                 "before run() implementation terminates.");
904                     }
905 
906                     try {
907                         // Run all remaining tasks and shutdown hooks.
908                         for (;;) {
909                             if (confirmShutdown()) {
910                                 break;
911                             }
912                         }
913                     } finally {
914                         try {
915                             cleanup();
916                         } finally {
917                             STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
918                             threadLock.release();
919                             if (!taskQueue.isEmpty()) {
920                                 logger.warn(
921                                         "An event executor terminated with " +
922                                                 "non-empty task queue (" + taskQueue.size() + ')');
923                             }
924 
925                             terminationFuture.setSuccess(null);
926                         }
927                     }
928                 }
929             }
930         });
931     }
932 
933     private static final class DefaultThreadProperties implements ThreadProperties {
934         private final Thread t;
935 
936         DefaultThreadProperties(Thread t) {
937             this.t = t;
938         }
939 
940         @Override
941         public State state() {
942             return t.getState();
943         }
944 
945         @Override
946         public int priority() {
947             return t.getPriority();
948         }
949 
950         @Override
951         public boolean isInterrupted() {
952             return t.isInterrupted();
953         }
954 
955         @Override
956         public boolean isDaemon() {
957             return t.isDaemon();
958         }
959 
960         @Override
961         public String name() {
962             return t.getName();
963         }
964 
965         @Override
966         public long id() {
967             return t.getId();
968         }
969 
970         @Override
971         public StackTraceElement[] stackTrace() {
972             return t.getStackTrace();
973         }
974 
975         @Override
976         public boolean isAlive() {
977             return t.isAlive();
978         }
979     }
980 }