View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.util.concurrent;
17  
18  import io.netty.util.internal.ObjectUtil;
19  import io.netty.util.internal.PlatformDependent;
20  import io.netty.util.internal.SystemPropertyUtil;
21  import io.netty.util.internal.UnstableApi;
22  import io.netty.util.internal.logging.InternalLogger;
23  import io.netty.util.internal.logging.InternalLoggerFactory;
24  
25  import java.lang.Thread.State;
26  import java.util.ArrayList;
27  import java.util.Collection;
28  import java.util.LinkedHashSet;
29  import java.util.List;
30  import java.util.Queue;
31  import java.util.Set;
32  import java.util.concurrent.BlockingQueue;
33  import java.util.concurrent.Callable;
34  import java.util.concurrent.ExecutionException;
35  import java.util.concurrent.Executor;
36  import java.util.concurrent.LinkedBlockingQueue;
37  import java.util.concurrent.RejectedExecutionException;
38  import java.util.concurrent.Semaphore;
39  import java.util.concurrent.ThreadFactory;
40  import java.util.concurrent.TimeUnit;
41  import java.util.concurrent.TimeoutException;
42  import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
43  import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
44  
45  /**
46   * Abstract base class for {@link OrderedEventExecutor}'s that execute all its submitted tasks in a single thread.
47   *
48   */
49  public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
50  
51      static final int DEFAULT_MAX_PENDING_EXECUTOR_TASKS = Math.max(16,
52              SystemPropertyUtil.getInt("io.netty.eventexecutor.maxPendingTasks", Integer.MAX_VALUE));
53  
54      private static final InternalLogger logger =
55              InternalLoggerFactory.getInstance(SingleThreadEventExecutor.class);
56  
57      private static final int ST_NOT_STARTED = 1;
58      private static final int ST_STARTED = 2;
59      private static final int ST_SHUTTING_DOWN = 3;
60      private static final int ST_SHUTDOWN = 4;
61      private static final int ST_TERMINATED = 5;
62  
63      private static final Runnable WAKEUP_TASK = new Runnable() {
64          @Override
65          public void run() {
66              // Do nothing.
67          }
68      };
69      private static final Runnable NOOP_TASK = new Runnable() {
70          @Override
71          public void run() {
72              // Do nothing.
73          }
74      };
75  
76      private static final AtomicIntegerFieldUpdater<SingleThreadEventExecutor> STATE_UPDATER =
77              AtomicIntegerFieldUpdater.newUpdater(SingleThreadEventExecutor.class, "state");
78      private static final AtomicReferenceFieldUpdater<SingleThreadEventExecutor, ThreadProperties> PROPERTIES_UPDATER =
79              AtomicReferenceFieldUpdater.newUpdater(
80                      SingleThreadEventExecutor.class, ThreadProperties.class, "threadProperties");
81  
82      private final Queue<Runnable> taskQueue;
83  
84      private volatile Thread thread;
85      @SuppressWarnings("unused")
86      private volatile ThreadProperties threadProperties;
87      private final Executor executor;
88      private volatile boolean interrupted;
89  
90      private final Semaphore threadLock = new Semaphore(0);
91      private final Set<Runnable> shutdownHooks = new LinkedHashSet<Runnable>();
92      private final boolean addTaskWakesUp;
93      private final int maxPendingTasks;
94      private final RejectedExecutionHandler rejectedExecutionHandler;
95  
96      private long lastExecutionTime;
97  
98      @SuppressWarnings({ "FieldMayBeFinal", "unused" })
99      private volatile int state = ST_NOT_STARTED;
100 
101     private volatile long gracefulShutdownQuietPeriod;
102     private volatile long gracefulShutdownTimeout;
103     private long gracefulShutdownStartTime;
104 
105     private final Promise<?> terminationFuture = new DefaultPromise<Void>(GlobalEventExecutor.INSTANCE);
106 
107     /**
108      * Create a new instance
109      *
110      * @param parent            the {@link EventExecutorGroup} which is the parent of this instance and belongs to it
111      * @param threadFactory     the {@link ThreadFactory} which will be used for the used {@link Thread}
112      * @param addTaskWakesUp    {@code true} if and only if invocation of {@link #addTask(Runnable)} will wake up the
113      *                          executor thread
114      */
115     protected SingleThreadEventExecutor(
116             EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
117         this(parent, new ThreadPerTaskExecutor(threadFactory), addTaskWakesUp);
118     }
119 
120     /**
121      * Create a new instance
122      *
123      * @param parent            the {@link EventExecutorGroup} which is the parent of this instance and belongs to it
124      * @param threadFactory     the {@link ThreadFactory} which will be used for the used {@link Thread}
125      * @param addTaskWakesUp    {@code true} if and only if invocation of {@link #addTask(Runnable)} will wake up the
126      *                          executor thread
127      * @param maxPendingTasks   the maximum number of pending tasks before new tasks will be rejected.
128      * @param rejectedHandler   the {@link RejectedExecutionHandler} to use.
129      */
130     protected SingleThreadEventExecutor(
131             EventExecutorGroup parent, ThreadFactory threadFactory,
132             boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedHandler) {
133         this(parent, new ThreadPerTaskExecutor(threadFactory), addTaskWakesUp, maxPendingTasks, rejectedHandler);
134     }
135 
136     /**
137      * Create a new instance
138      *
139      * @param parent            the {@link EventExecutorGroup} which is the parent of this instance and belongs to it
140      * @param executor          the {@link Executor} which will be used for executing
141      * @param addTaskWakesUp    {@code true} if and only if invocation of {@link #addTask(Runnable)} will wake up the
142      *                          executor thread
143      */
144     protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp) {
145         this(parent, executor, addTaskWakesUp, DEFAULT_MAX_PENDING_EXECUTOR_TASKS, RejectedExecutionHandlers.reject());
146     }
147 
148     /**
149      * Create a new instance
150      *
151      * @param parent            the {@link EventExecutorGroup} which is the parent of this instance and belongs to it
152      * @param executor          the {@link Executor} which will be used for executing
153      * @param addTaskWakesUp    {@code true} if and only if invocation of {@link #addTask(Runnable)} will wake up the
154      *                          executor thread
155      * @param maxPendingTasks   the maximum number of pending tasks before new tasks will be rejected.
156      * @param rejectedHandler   the {@link RejectedExecutionHandler} to use.
157      */
158     protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
159                                         boolean addTaskWakesUp, int maxPendingTasks,
160                                         RejectedExecutionHandler rejectedHandler) {
161         super(parent);
162         this.addTaskWakesUp = addTaskWakesUp;
163         this.maxPendingTasks = Math.max(16, maxPendingTasks);
164         this.executor = ObjectUtil.checkNotNull(executor, "executor");
165         taskQueue = newTaskQueue(this.maxPendingTasks);
166         rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
167     }
168 
169     /**
170      * @deprecated Please use and override {@link #newTaskQueue(int)}.
171      */
172     @Deprecated
173     protected Queue<Runnable> newTaskQueue() {
174         return newTaskQueue(maxPendingTasks);
175     }
176 
177     /**
178      * Create a new {@link Queue} which will holds the tasks to execute. This default implementation will return a
179      * {@link LinkedBlockingQueue} but if your sub-class of {@link SingleThreadEventExecutor} will not do any blocking
180      * calls on the this {@link Queue} it may make sense to {@code @Override} this and return some more performant
181      * implementation that does not support blocking operations at all.
182      */
183     protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
184         return new LinkedBlockingQueue<Runnable>(maxPendingTasks);
185     }
186 
187     /**
188      * Interrupt the current running {@link Thread}.
189      */
190     protected void interruptThread() {
191         Thread currentThread = thread;
192         if (currentThread == null) {
193             interrupted = true;
194         } else {
195             currentThread.interrupt();
196         }
197     }
198 
199     /**
200      * @see Queue#poll()
201      */
202     protected Runnable pollTask() {
203         assert inEventLoop();
204         return pollTaskFrom(taskQueue);
205     }
206 
207     protected static Runnable pollTaskFrom(Queue<Runnable> taskQueue) {
208         for (;;) {
209             Runnable task = taskQueue.poll();
210             if (task == WAKEUP_TASK) {
211                 continue;
212             }
213             return task;
214         }
215     }
216 
217     /**
218      * Take the next {@link Runnable} from the task queue and so will block if no task is currently present.
219      * <p>
220      * Be aware that this method will throw an {@link UnsupportedOperationException} if the task queue, which was
221      * created via {@link #newTaskQueue()}, does not implement {@link BlockingQueue}.
222      * </p>
223      *
224      * @return {@code null} if the executor thread has been interrupted or waken up.
225      */
226     protected Runnable takeTask() {
227         assert inEventLoop();
228         if (!(taskQueue instanceof BlockingQueue)) {
229             throw new UnsupportedOperationException();
230         }
231 
232         BlockingQueue<Runnable> taskQueue = (BlockingQueue<Runnable>) this.taskQueue;
233         for (;;) {
234             ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
235             if (scheduledTask == null) {
236                 Runnable task = null;
237                 try {
238                     task = taskQueue.take();
239                     if (task == WAKEUP_TASK) {
240                         task = null;
241                     }
242                 } catch (InterruptedException e) {
243                     // Ignore
244                 }
245                 return task;
246             } else {
247                 long delayNanos = scheduledTask.delayNanos();
248                 Runnable task = null;
249                 if (delayNanos > 0) {
250                     try {
251                         task = taskQueue.poll(delayNanos, TimeUnit.NANOSECONDS);
252                     } catch (InterruptedException e) {
253                         // Waken up.
254                         return null;
255                     }
256                 }
257                 if (task == null) {
258                     // We need to fetch the scheduled tasks now as otherwise there may be a chance that
259                     // scheduled tasks are never executed if there is always one task in the taskQueue.
260                     // This is for example true for the read task of OIO Transport
261                     // See https://github.com/netty/netty/issues/1614
262                     fetchFromScheduledTaskQueue();
263                     task = taskQueue.poll();
264                 }
265 
266                 if (task != null) {
267                     return task;
268                 }
269             }
270         }
271     }
272 
273     private boolean fetchFromScheduledTaskQueue() {
274         long nanoTime = AbstractScheduledEventExecutor.nanoTime();
275         Runnable scheduledTask  = pollScheduledTask(nanoTime);
276         while (scheduledTask != null) {
277             if (!taskQueue.offer(scheduledTask)) {
278                 // No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again.
279                 scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);
280                 return false;
281             }
282             scheduledTask  = pollScheduledTask(nanoTime);
283         }
284         return true;
285     }
286 
287     /**
288      * @see Queue#peek()
289      */
290     protected Runnable peekTask() {
291         assert inEventLoop();
292         return taskQueue.peek();
293     }
294 
295     /**
296      * @see Queue#isEmpty()
297      */
298     protected boolean hasTasks() {
299         assert inEventLoop();
300         return !taskQueue.isEmpty();
301     }
302 
303     /**
304      * Return the number of tasks that are pending for processing.
305      *
306      * <strong>Be aware that this operation may be expensive as it depends on the internal implementation of the
307      * SingleThreadEventExecutor. So use it with care!</strong>
308      */
309     public int pendingTasks() {
310         return taskQueue.size();
311     }
312 
313     /**
314      * Add a task to the task queue, or throws a {@link RejectedExecutionException} if this instance was shutdown
315      * before.
316      */
317     protected void addTask(Runnable task) {
318         if (task == null) {
319             throw new NullPointerException("task");
320         }
321         if (!offerTask(task)) {
322             reject(task);
323         }
324     }
325 
326     final boolean offerTask(Runnable task) {
327         if (isShutdown()) {
328             reject();
329         }
330         return taskQueue.offer(task);
331     }
332 
333     /**
334      * @see Queue#remove(Object)
335      */
336     protected boolean removeTask(Runnable task) {
337         if (task == null) {
338             throw new NullPointerException("task");
339         }
340         return taskQueue.remove(task);
341     }
342 
343     /**
344      * Poll all tasks from the task queue and run them via {@link Runnable#run()} method.
345      *
346      * @return {@code true} if and only if at least one task was run
347      */
348     protected boolean runAllTasks() {
349         assert inEventLoop();
350         boolean fetchedAll;
351         boolean ranAtLeastOne = false;
352 
353         do {
354             fetchedAll = fetchFromScheduledTaskQueue();
355             if (runAllTasksFrom(taskQueue)) {
356                 ranAtLeastOne = true;
357             }
358         } while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.
359 
360         if (ranAtLeastOne) {
361             lastExecutionTime = ScheduledFutureTask.nanoTime();
362         }
363         afterRunningAllTasks();
364         return ranAtLeastOne;
365     }
366 
367     /**
368      * Runs all tasks from the passed {@code taskQueue}.
369      *
370      * @param taskQueue To poll and execute all tasks.
371      *
372      * @return {@code true} if at least one task was executed.
373      */
374     protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) {
375         Runnable task = pollTaskFrom(taskQueue);
376         if (task == null) {
377             return false;
378         }
379         for (;;) {
380             safeExecute(task);
381             task = pollTaskFrom(taskQueue);
382             if (task == null) {
383                 return true;
384             }
385         }
386     }
387 
388     /**
389      * Poll all tasks from the task queue and run them via {@link Runnable#run()} method.  This method stops running
390      * the tasks in the task queue and returns if it ran longer than {@code timeoutNanos}.
391      */
392     protected boolean runAllTasks(long timeoutNanos) {
393         fetchFromScheduledTaskQueue();
394         Runnable task = pollTask();
395         if (task == null) {
396             afterRunningAllTasks();
397             return false;
398         }
399 
400         final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
401         long runTasks = 0;
402         long lastExecutionTime;
403         for (;;) {
404             safeExecute(task);
405 
406             runTasks ++;
407 
408             // Check timeout every 64 tasks because nanoTime() is relatively expensive.
409             // XXX: Hard-coded value - will make it configurable if it is really a problem.
410             if ((runTasks & 0x3F) == 0) {
411                 lastExecutionTime = ScheduledFutureTask.nanoTime();
412                 if (lastExecutionTime >= deadline) {
413                     break;
414                 }
415             }
416 
417             task = pollTask();
418             if (task == null) {
419                 lastExecutionTime = ScheduledFutureTask.nanoTime();
420                 break;
421             }
422         }
423 
424         afterRunningAllTasks();
425         this.lastExecutionTime = lastExecutionTime;
426         return true;
427     }
428 
429     /**
430      * Invoked before returning from {@link #runAllTasks()} and {@link #runAllTasks(long)}.
431      */
432     @UnstableApi
433     protected void afterRunningAllTasks() { }
434     /**
435      * Returns the amount of time left until the scheduled task with the closest dead line is executed.
436      */
437     protected long delayNanos(long currentTimeNanos) {
438         ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
439         if (scheduledTask == null) {
440             return SCHEDULE_PURGE_INTERVAL;
441         }
442 
443         return scheduledTask.delayNanos(currentTimeNanos);
444     }
445 
446     /**
447      * Returns the absolute point in time (relative to {@link #nanoTime()}) at which the the next
448      * closest scheduled task should run.
449      */
450     @UnstableApi
451     protected long deadlineNanos() {
452         ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
453         if (scheduledTask == null) {
454             return nanoTime() + SCHEDULE_PURGE_INTERVAL;
455         }
456         return scheduledTask.deadlineNanos();
457     }
458 
459     /**
460      * Updates the internal timestamp that tells when a submitted task was executed most recently.
461      * {@link #runAllTasks()} and {@link #runAllTasks(long)} updates this timestamp automatically, and thus there's
462      * usually no need to call this method.  However, if you take the tasks manually using {@link #takeTask()} or
463      * {@link #pollTask()}, you have to call this method at the end of task execution loop for accurate quiet period
464      * checks.
465      */
466     protected void updateLastExecutionTime() {
467         lastExecutionTime = ScheduledFutureTask.nanoTime();
468     }
469 
470     /**
471      *
472      */
473     protected abstract void run();
474 
475     /**
476      * Do nothing, sub-classes may override
477      */
478     protected void cleanup() {
479         // NOOP
480     }
481 
482     protected void wakeup(boolean inEventLoop) {
483         if (!inEventLoop || state == ST_SHUTTING_DOWN) {
484             // Use offer as we actually only need this to unblock the thread and if offer fails we do not care as there
485             // is already something in the queue.
486             taskQueue.offer(WAKEUP_TASK);
487         }
488     }
489 
490     @Override
491     public boolean inEventLoop(Thread thread) {
492         return thread == this.thread;
493     }
494 
495     /**
496      * Add a {@link Runnable} which will be executed on shutdown of this instance
497      */
498     public void addShutdownHook(final Runnable task) {
499         if (inEventLoop()) {
500             shutdownHooks.add(task);
501         } else {
502             execute(new Runnable() {
503                 @Override
504                 public void run() {
505                     shutdownHooks.add(task);
506                 }
507             });
508         }
509     }
510 
511     /**
512      * Remove a previous added {@link Runnable} as a shutdown hook
513      */
514     public void removeShutdownHook(final Runnable task) {
515         if (inEventLoop()) {
516             shutdownHooks.remove(task);
517         } else {
518             execute(new Runnable() {
519                 @Override
520                 public void run() {
521                     shutdownHooks.remove(task);
522                 }
523             });
524         }
525     }
526 
527     private boolean runShutdownHooks() {
528         boolean ran = false;
529         // Note shutdown hooks can add / remove shutdown hooks.
530         while (!shutdownHooks.isEmpty()) {
531             List<Runnable> copy = new ArrayList<Runnable>(shutdownHooks);
532             shutdownHooks.clear();
533             for (Runnable task: copy) {
534                 try {
535                     task.run();
536                 } catch (Throwable t) {
537                     logger.warn("Shutdown hook raised an exception.", t);
538                 } finally {
539                     ran = true;
540                 }
541             }
542         }
543 
544         if (ran) {
545             lastExecutionTime = ScheduledFutureTask.nanoTime();
546         }
547 
548         return ran;
549     }
550 
551     @Override
552     public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
553         if (quietPeriod < 0) {
554             throw new IllegalArgumentException("quietPeriod: " + quietPeriod + " (expected >= 0)");
555         }
556         if (timeout < quietPeriod) {
557             throw new IllegalArgumentException(
558                     "timeout: " + timeout + " (expected >= quietPeriod (" + quietPeriod + "))");
559         }
560         if (unit == null) {
561             throw new NullPointerException("unit");
562         }
563 
564         if (isShuttingDown()) {
565             return terminationFuture();
566         }
567 
568         boolean inEventLoop = inEventLoop();
569         boolean wakeup;
570         int oldState;
571         for (;;) {
572             if (isShuttingDown()) {
573                 return terminationFuture();
574             }
575             int newState;
576             wakeup = true;
577             oldState = state;
578             if (inEventLoop) {
579                 newState = ST_SHUTTING_DOWN;
580             } else {
581                 switch (oldState) {
582                     case ST_NOT_STARTED:
583                     case ST_STARTED:
584                         newState = ST_SHUTTING_DOWN;
585                         break;
586                     default:
587                         newState = oldState;
588                         wakeup = false;
589                 }
590             }
591             if (STATE_UPDATER.compareAndSet(this, oldState, newState)) {
592                 break;
593             }
594         }
595         gracefulShutdownQuietPeriod = unit.toNanos(quietPeriod);
596         gracefulShutdownTimeout = unit.toNanos(timeout);
597 
598         if (oldState == ST_NOT_STARTED) {
599             try {
600                 doStartThread();
601             } catch (Throwable cause) {
602                 STATE_UPDATER.set(this, ST_TERMINATED);
603                 terminationFuture.tryFailure(cause);
604 
605                 if (!(cause instanceof Exception)) {
606                     // Also rethrow as it may be an OOME for example
607                     PlatformDependent.throwException(cause);
608                 }
609                 return terminationFuture;
610             }
611         }
612 
613         if (wakeup) {
614             wakeup(inEventLoop);
615         }
616 
617         return terminationFuture();
618     }
619 
620     @Override
621     public Future<?> terminationFuture() {
622         return terminationFuture;
623     }
624 
625     @Override
626     @Deprecated
627     public void shutdown() {
628         if (isShutdown()) {
629             return;
630         }
631 
632         boolean inEventLoop = inEventLoop();
633         boolean wakeup;
634         int oldState;
635         for (;;) {
636             if (isShuttingDown()) {
637                 return;
638             }
639             int newState;
640             wakeup = true;
641             oldState = state;
642             if (inEventLoop) {
643                 newState = ST_SHUTDOWN;
644             } else {
645                 switch (oldState) {
646                     case ST_NOT_STARTED:
647                     case ST_STARTED:
648                     case ST_SHUTTING_DOWN:
649                         newState = ST_SHUTDOWN;
650                         break;
651                     default:
652                         newState = oldState;
653                         wakeup = false;
654                 }
655             }
656             if (STATE_UPDATER.compareAndSet(this, oldState, newState)) {
657                 break;
658             }
659         }
660 
661         if (oldState == ST_NOT_STARTED) {
662             try {
663                 doStartThread();
664             } catch (Throwable cause) {
665                 STATE_UPDATER.set(this, ST_TERMINATED);
666                 terminationFuture.tryFailure(cause);
667 
668                 if (!(cause instanceof Exception)) {
669                     // Also rethrow as it may be an OOME for example
670                     PlatformDependent.throwException(cause);
671                 }
672                 return;
673             }
674         }
675 
676         if (wakeup) {
677             wakeup(inEventLoop);
678         }
679     }
680 
681     @Override
682     public boolean isShuttingDown() {
683         return state >= ST_SHUTTING_DOWN;
684     }
685 
686     @Override
687     public boolean isShutdown() {
688         return state >= ST_SHUTDOWN;
689     }
690 
691     @Override
692     public boolean isTerminated() {
693         return state == ST_TERMINATED;
694     }
695 
696     /**
697      * Confirm that the shutdown if the instance should be done now!
698      */
699     protected boolean confirmShutdown() {
700         if (!isShuttingDown()) {
701             return false;
702         }
703 
704         if (!inEventLoop()) {
705             throw new IllegalStateException("must be invoked from an event loop");
706         }
707 
708         cancelScheduledTasks();
709 
710         if (gracefulShutdownStartTime == 0) {
711             gracefulShutdownStartTime = ScheduledFutureTask.nanoTime();
712         }
713 
714         if (runAllTasks() || runShutdownHooks()) {
715             if (isShutdown()) {
716                 // Executor shut down - no new tasks anymore.
717                 return true;
718             }
719 
720             // There were tasks in the queue. Wait a little bit more until no tasks are queued for the quiet period or
721             // terminate if the quiet period is 0.
722             // See https://github.com/netty/netty/issues/4241
723             if (gracefulShutdownQuietPeriod == 0) {
724                 return true;
725             }
726             wakeup(true);
727             return false;
728         }
729 
730         final long nanoTime = ScheduledFutureTask.nanoTime();
731 
732         if (isShutdown() || nanoTime - gracefulShutdownStartTime > gracefulShutdownTimeout) {
733             return true;
734         }
735 
736         if (nanoTime - lastExecutionTime <= gracefulShutdownQuietPeriod) {
737             // Check if any tasks were added to the queue every 100ms.
738             // TODO: Change the behavior of takeTask() so that it returns on timeout.
739             wakeup(true);
740             try {
741                 Thread.sleep(100);
742             } catch (InterruptedException e) {
743                 // Ignore
744             }
745 
746             return false;
747         }
748 
749         // No tasks were added for last quiet period - hopefully safe to shut down.
750         // (Hopefully because we really cannot make a guarantee that there will be no execute() calls by a user.)
751         return true;
752     }
753 
754     @Override
755     public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
756         if (unit == null) {
757             throw new NullPointerException("unit");
758         }
759 
760         if (inEventLoop()) {
761             throw new IllegalStateException("cannot await termination of the current thread");
762         }
763 
764         if (threadLock.tryAcquire(timeout, unit)) {
765             threadLock.release();
766         }
767 
768         return isTerminated();
769     }
770 
771     @Override
772     public void execute(Runnable task) {
773         if (task == null) {
774             throw new NullPointerException("task");
775         }
776 
777         boolean inEventLoop = inEventLoop();
778         addTask(task);
779         if (!inEventLoop) {
780             startThread();
781             if (isShutdown()) {
782                 boolean reject = false;
783                 try {
784                     if (removeTask(task)) {
785                         reject = true;
786                     }
787                 } catch (UnsupportedOperationException e) {
788                     // The task queue does not support removal so the best thing we can do is to just move on and
789                     // hope we will be able to pick-up the task before its completely terminated.
790                     // In worst case we will log on termination.
791                 }
792                 if (reject) {
793                     reject();
794                 }
795             }
796         }
797 
798         if (!addTaskWakesUp && wakesUpForTask(task)) {
799             wakeup(inEventLoop);
800         }
801     }
802 
803     @Override
804     public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
805         throwIfInEventLoop("invokeAny");
806         return super.invokeAny(tasks);
807     }
808 
809     @Override
810     public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
811             throws InterruptedException, ExecutionException, TimeoutException {
812         throwIfInEventLoop("invokeAny");
813         return super.invokeAny(tasks, timeout, unit);
814     }
815 
816     @Override
817     public <T> List<java.util.concurrent.Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
818             throws InterruptedException {
819         throwIfInEventLoop("invokeAll");
820         return super.invokeAll(tasks);
821     }
822 
823     @Override
824     public <T> List<java.util.concurrent.Future<T>> invokeAll(
825             Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
826         throwIfInEventLoop("invokeAll");
827         return super.invokeAll(tasks, timeout, unit);
828     }
829 
830     private void throwIfInEventLoop(String method) {
831         if (inEventLoop()) {
832             throw new RejectedExecutionException("Calling " + method + " from within the EventLoop is not allowed");
833         }
834     }
835 
836     /**
837      * Returns the {@link ThreadProperties} of the {@link Thread} that powers the {@link SingleThreadEventExecutor}.
838      * If the {@link SingleThreadEventExecutor} is not started yet, this operation will start it and block until the
839      * it is fully started.
840      */
841     public final ThreadProperties threadProperties() {
842         ThreadProperties threadProperties = this.threadProperties;
843         if (threadProperties == null) {
844             Thread thread = this.thread;
845             if (thread == null) {
846                 assert !inEventLoop();
847                 submit(NOOP_TASK).syncUninterruptibly();
848                 thread = this.thread;
849                 assert thread != null;
850             }
851 
852             threadProperties = new DefaultThreadProperties(thread);
853             if (!PROPERTIES_UPDATER.compareAndSet(this, null, threadProperties)) {
854                 threadProperties = this.threadProperties;
855             }
856         }
857 
858         return threadProperties;
859     }
860 
861     @SuppressWarnings("unused")
862     protected boolean wakesUpForTask(Runnable task) {
863         return true;
864     }
865 
866     protected static void reject() {
867         throw new RejectedExecutionException("event executor terminated");
868     }
869 
870     /**
871      * Offers the task to the associated {@link RejectedExecutionHandler}.
872      *
873      * @param task to reject.
874      */
875     protected final void reject(Runnable task) {
876         rejectedExecutionHandler.rejected(task, this);
877     }
878 
879     // ScheduledExecutorService implementation
880 
881     private static final long SCHEDULE_PURGE_INTERVAL = TimeUnit.SECONDS.toNanos(1);
882 
883     private void startThread() {
884         if (state == ST_NOT_STARTED) {
885             if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
886                 try {
887                     doStartThread();
888                 } catch (Throwable cause) {
889                     STATE_UPDATER.set(this, ST_NOT_STARTED);
890                     PlatformDependent.throwException(cause);
891                 }
892             }
893         }
894     }
895 
896     private void doStartThread() {
897         assert thread == null;
898         executor.execute(new Runnable() {
899             @Override
900             public void run() {
901                 thread = Thread.currentThread();
902                 if (interrupted) {
903                     thread.interrupt();
904                 }
905 
906                 boolean success = false;
907                 updateLastExecutionTime();
908                 try {
909                     SingleThreadEventExecutor.this.run();
910                     success = true;
911                 } catch (Throwable t) {
912                     logger.warn("Unexpected exception from an event executor: ", t);
913                 } finally {
914                     for (;;) {
915                         int oldState = state;
916                         if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
917                                 SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
918                             break;
919                         }
920                     }
921 
922                     // Check if confirmShutdown() was called at the end of the loop.
923                     if (success && gracefulShutdownStartTime == 0) {
924                         if (logger.isErrorEnabled()) {
925                             logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
926                                     SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must " +
927                                     "be called before run() implementation terminates.");
928                         }
929                     }
930 
931                     try {
932                         // Run all remaining tasks and shutdown hooks.
933                         for (;;) {
934                             if (confirmShutdown()) {
935                                 break;
936                             }
937                         }
938                     } finally {
939                         try {
940                             cleanup();
941                         } finally {
942                             STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
943                             threadLock.release();
944                             if (!taskQueue.isEmpty()) {
945                                 if (logger.isWarnEnabled()) {
946                                     logger.warn("An event executor terminated with " +
947                                             "non-empty task queue (" + taskQueue.size() + ')');
948                                 }
949                             }
950 
951                             terminationFuture.setSuccess(null);
952                         }
953                     }
954                 }
955             }
956         });
957     }
958 
959     private static final class DefaultThreadProperties implements ThreadProperties {
960         private final Thread t;
961 
962         DefaultThreadProperties(Thread t) {
963             this.t = t;
964         }
965 
966         @Override
967         public State state() {
968             return t.getState();
969         }
970 
971         @Override
972         public int priority() {
973             return t.getPriority();
974         }
975 
976         @Override
977         public boolean isInterrupted() {
978             return t.isInterrupted();
979         }
980 
981         @Override
982         public boolean isDaemon() {
983             return t.isDaemon();
984         }
985 
986         @Override
987         public String name() {
988             return t.getName();
989         }
990 
991         @Override
992         public long id() {
993             return t.getId();
994         }
995 
996         @Override
997         public StackTraceElement[] stackTrace() {
998             return t.getStackTrace();
999         }
1000 
1001         @Override
1002         public boolean isAlive() {
1003             return t.isAlive();
1004         }
1005     }
1006 }