1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  package io.netty.util.concurrent;
17  
18  import io.netty.util.internal.ObjectUtil;
19  import io.netty.util.internal.PlatformDependent;
20  import io.netty.util.internal.SystemPropertyUtil;
21  import io.netty.util.internal.ThreadExecutorMap;
22  import io.netty.util.internal.logging.InternalLogger;
23  import io.netty.util.internal.logging.InternalLoggerFactory;
24  import org.jetbrains.annotations.Async.Schedule;
25  
26  import java.lang.Thread.State;
27  import java.util.ArrayList;
28  import java.util.Collection;
29  import java.util.LinkedHashSet;
30  import java.util.List;
31  import java.util.Queue;
32  import java.util.Set;
33  import java.util.concurrent.BlockingQueue;
34  import java.util.concurrent.Callable;
35  import java.util.concurrent.CountDownLatch;
36  import java.util.concurrent.ExecutionException;
37  import java.util.concurrent.Executor;
38  import java.util.concurrent.LinkedBlockingQueue;
39  import java.util.concurrent.RejectedExecutionException;
40  import java.util.concurrent.ThreadFactory;
41  import java.util.concurrent.TimeUnit;
42  import java.util.concurrent.TimeoutException;
43  import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
44  import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
45  
46  
47  
48  
49  
50  public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
51  
52      static final int DEFAULT_MAX_PENDING_EXECUTOR_TASKS = Math.max(16,
53              SystemPropertyUtil.getInt("io.netty.eventexecutor.maxPendingTasks", Integer.MAX_VALUE));
54  
55      private static final InternalLogger logger =
56              InternalLoggerFactory.getInstance(SingleThreadEventExecutor.class);
57  
58      private static final int ST_NOT_STARTED = 1;
59      private static final int ST_STARTED = 2;
60      private static final int ST_SHUTTING_DOWN = 3;
61      private static final int ST_SHUTDOWN = 4;
62      private static final int ST_TERMINATED = 5;
63  
64      private static final Runnable NOOP_TASK = new Runnable() {
65          @Override
66          public void run() {
67              
68          }
69      };
70  
71      private static final AtomicIntegerFieldUpdater<SingleThreadEventExecutor> STATE_UPDATER =
72              AtomicIntegerFieldUpdater.newUpdater(SingleThreadEventExecutor.class, "state");
73      private static final AtomicReferenceFieldUpdater<SingleThreadEventExecutor, ThreadProperties> PROPERTIES_UPDATER =
74              AtomicReferenceFieldUpdater.newUpdater(
75                      SingleThreadEventExecutor.class, ThreadProperties.class, "threadProperties");
76  
77      private final Queue<Runnable> taskQueue;
78  
79      private volatile Thread thread;
80      @SuppressWarnings("unused")
81      private volatile ThreadProperties threadProperties;
82      private final Executor executor;
83      private volatile boolean interrupted;
84  
85      private final CountDownLatch threadLock = new CountDownLatch(1);
86      private final Set<Runnable> shutdownHooks = new LinkedHashSet<Runnable>();
87      private final boolean addTaskWakesUp;
88      private final int maxPendingTasks;
89      private final RejectedExecutionHandler rejectedExecutionHandler;
90  
91      private long lastExecutionTime;
92  
93      @SuppressWarnings({ "FieldMayBeFinal", "unused" })
94      private volatile int state = ST_NOT_STARTED;
95  
96      private volatile long gracefulShutdownQuietPeriod;
97      private volatile long gracefulShutdownTimeout;
98      private long gracefulShutdownStartTime;
99  
100     private final Promise<?> terminationFuture = new DefaultPromise<Void>(GlobalEventExecutor.INSTANCE);
101 
102     
103 
104 
105 
106 
107 
108 
109 
110     protected SingleThreadEventExecutor(
111             EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
112         this(parent, new ThreadPerTaskExecutor(threadFactory), addTaskWakesUp);
113     }
114 
115     
116 
117 
118 
119 
120 
121 
122 
123 
124 
125     protected SingleThreadEventExecutor(
126             EventExecutorGroup parent, ThreadFactory threadFactory,
127             boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedHandler) {
128         this(parent, new ThreadPerTaskExecutor(threadFactory), addTaskWakesUp, maxPendingTasks, rejectedHandler);
129     }
130 
131     
132 
133 
134 
135 
136 
137 
138 
139     protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp) {
140         this(parent, executor, addTaskWakesUp, DEFAULT_MAX_PENDING_EXECUTOR_TASKS, RejectedExecutionHandlers.reject());
141     }
142 
143     
144 
145 
146 
147 
148 
149 
150 
151 
152 
153     protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
154                                         boolean addTaskWakesUp, int maxPendingTasks,
155                                         RejectedExecutionHandler rejectedHandler) {
156         super(parent);
157         this.addTaskWakesUp = addTaskWakesUp;
158         this.maxPendingTasks = Math.max(16, maxPendingTasks);
159         this.executor = ThreadExecutorMap.apply(executor, this);
160         taskQueue = newTaskQueue(this.maxPendingTasks);
161         rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
162     }
163 
164     protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
165                                         boolean addTaskWakesUp, Queue<Runnable> taskQueue,
166                                         RejectedExecutionHandler rejectedHandler) {
167         super(parent);
168         this.addTaskWakesUp = addTaskWakesUp;
169         this.maxPendingTasks = DEFAULT_MAX_PENDING_EXECUTOR_TASKS;
170         this.executor = ThreadExecutorMap.apply(executor, this);
171         this.taskQueue = ObjectUtil.checkNotNull(taskQueue, "taskQueue");
172         this.rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
173     }
174 
175     
176 
177 
178     @Deprecated
179     protected Queue<Runnable> newTaskQueue() {
180         return newTaskQueue(maxPendingTasks);
181     }
182 
183     
184 
185 
186 
187 
188 
189     protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
190         return new LinkedBlockingQueue<Runnable>(maxPendingTasks);
191     }
192 
193     
194 
195 
196     protected void interruptThread() {
197         Thread currentThread = thread;
198         if (currentThread == null) {
199             interrupted = true;
200         } else {
201             currentThread.interrupt();
202         }
203     }
204 
205     
206 
207 
208     protected Runnable pollTask() {
209         assert inEventLoop();
210         return pollTaskFrom(taskQueue);
211     }
212 
213     protected static Runnable pollTaskFrom(Queue<Runnable> taskQueue) {
214         for (;;) {
215             Runnable task = taskQueue.poll();
216             if (task != WAKEUP_TASK) {
217                 return task;
218             }
219         }
220     }
221 
222     
223 
224 
225 
226 
227 
228 
229 
230 
231     protected Runnable takeTask() {
232         assert inEventLoop();
233         if (!(taskQueue instanceof BlockingQueue)) {
234             throw new UnsupportedOperationException();
235         }
236 
237         BlockingQueue<Runnable> taskQueue = (BlockingQueue<Runnable>) this.taskQueue;
238         for (;;) {
239             ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
240             if (scheduledTask == null) {
241                 Runnable task = null;
242                 try {
243                     task = taskQueue.take();
244                     if (task == WAKEUP_TASK) {
245                         task = null;
246                     }
247                 } catch (InterruptedException e) {
248                     
249                 }
250                 return task;
251             } else {
252                 long delayNanos = scheduledTask.delayNanos();
253                 Runnable task = null;
254                 if (delayNanos > 0) {
255                     try {
256                         task = taskQueue.poll(delayNanos, TimeUnit.NANOSECONDS);
257                     } catch (InterruptedException e) {
258                         
259                         return null;
260                     }
261                 }
262                 if (task == null) {
263                     
264                     
265                     
266                     
267                     fetchFromScheduledTaskQueue();
268                     task = taskQueue.poll();
269                 }
270 
271                 if (task != null) {
272                     if (task == WAKEUP_TASK) {
273                         return null;
274                     }
275                     return task;
276                 }
277             }
278         }
279     }
280 
281     private boolean fetchFromScheduledTaskQueue() {
282         if (scheduledTaskQueue == null || scheduledTaskQueue.isEmpty()) {
283             return true;
284         }
285         long nanoTime = getCurrentTimeNanos();
286         for (;;) {
287             Runnable scheduledTask = pollScheduledTask(nanoTime);
288             if (scheduledTask == null) {
289                 return true;
290             }
291             if (!taskQueue.offer(scheduledTask)) {
292                 
293                 scheduledTaskQueue.add((ScheduledFutureTask<?>) scheduledTask);
294                 return false;
295             }
296         }
297     }
298 
299     
300 
301 
302     private boolean executeExpiredScheduledTasks() {
303         if (scheduledTaskQueue == null || scheduledTaskQueue.isEmpty()) {
304             return false;
305         }
306         long nanoTime = getCurrentTimeNanos();
307         Runnable scheduledTask = pollScheduledTask(nanoTime);
308         if (scheduledTask == null) {
309             return false;
310         }
311         do {
312             safeExecute(scheduledTask);
313         } while ((scheduledTask = pollScheduledTask(nanoTime)) != null);
314         return true;
315     }
316 
317     
318 
319 
320     protected Runnable peekTask() {
321         assert inEventLoop();
322         return taskQueue.peek();
323     }
324 
325     
326 
327 
328     protected boolean hasTasks() {
329         assert inEventLoop();
330         return !taskQueue.isEmpty();
331     }
332 
333     
334 
335 
336     public int pendingTasks() {
337         return taskQueue.size();
338     }
339 
340     
341 
342 
343 
344     protected void addTask(Runnable task) {
345         ObjectUtil.checkNotNull(task, "task");
346         if (!offerTask(task)) {
347             reject(task);
348         }
349     }
350 
351     final boolean offerTask(Runnable task) {
352         if (isShutdown()) {
353             reject();
354         }
355         return taskQueue.offer(task);
356     }
357 
358     
359 
360 
361     protected boolean removeTask(Runnable task) {
362         return taskQueue.remove(ObjectUtil.checkNotNull(task, "task"));
363     }
364 
365     
366 
367 
368 
369 
370     protected boolean runAllTasks() {
371         assert inEventLoop();
372         boolean fetchedAll;
373         boolean ranAtLeastOne = false;
374 
375         do {
376             fetchedAll = fetchFromScheduledTaskQueue();
377             if (runAllTasksFrom(taskQueue)) {
378                 ranAtLeastOne = true;
379             }
380         } while (!fetchedAll); 
381 
382         if (ranAtLeastOne) {
383             lastExecutionTime = getCurrentTimeNanos();
384         }
385         afterRunningAllTasks();
386         return ranAtLeastOne;
387     }
388 
389     
390 
391 
392 
393 
394 
395 
396 
397     protected final boolean runScheduledAndExecutorTasks(final int maxDrainAttempts) {
398         assert inEventLoop();
399         boolean ranAtLeastOneTask;
400         int drainAttempt = 0;
401         do {
402             
403             
404             ranAtLeastOneTask = runExistingTasksFrom(taskQueue) | executeExpiredScheduledTasks();
405         } while (ranAtLeastOneTask && ++drainAttempt < maxDrainAttempts);
406 
407         if (drainAttempt > 0) {
408             lastExecutionTime = getCurrentTimeNanos();
409         }
410         afterRunningAllTasks();
411 
412         return drainAttempt > 0;
413     }
414 
415     
416 
417 
418 
419 
420 
421 
422     protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) {
423         Runnable task = pollTaskFrom(taskQueue);
424         if (task == null) {
425             return false;
426         }
427         for (;;) {
428             safeExecute(task);
429             task = pollTaskFrom(taskQueue);
430             if (task == null) {
431                 return true;
432             }
433         }
434     }
435 
436     
437 
438 
439 
440 
441     private boolean runExistingTasksFrom(Queue<Runnable> taskQueue) {
442         Runnable task = pollTaskFrom(taskQueue);
443         if (task == null) {
444             return false;
445         }
446         int remaining = Math.min(maxPendingTasks, taskQueue.size());
447         safeExecute(task);
448         
449         
450         while (remaining-- > 0 && (task = taskQueue.poll()) != null) {
451             safeExecute(task);
452         }
453         return true;
454     }
455 
456     
457 
458 
459 
460     protected boolean runAllTasks(long timeoutNanos) {
461         fetchFromScheduledTaskQueue();
462         Runnable task = pollTask();
463         if (task == null) {
464             afterRunningAllTasks();
465             return false;
466         }
467 
468         final long deadline = timeoutNanos > 0 ? getCurrentTimeNanos() + timeoutNanos : 0;
469         long runTasks = 0;
470         long lastExecutionTime;
471         for (;;) {
472             safeExecute(task);
473 
474             runTasks ++;
475 
476             
477             
478             if ((runTasks & 0x3F) == 0) {
479                 lastExecutionTime = getCurrentTimeNanos();
480                 if (lastExecutionTime >= deadline) {
481                     break;
482                 }
483             }
484 
485             task = pollTask();
486             if (task == null) {
487                 lastExecutionTime = getCurrentTimeNanos();
488                 break;
489             }
490         }
491 
492         afterRunningAllTasks();
493         this.lastExecutionTime = lastExecutionTime;
494         return true;
495     }
496 
497     
498 
499 
500     protected void afterRunningAllTasks() { }
501 
502     
503 
504 
505     protected long delayNanos(long currentTimeNanos) {
506         currentTimeNanos -= initialNanoTime();
507 
508         ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
509         if (scheduledTask == null) {
510             return SCHEDULE_PURGE_INTERVAL;
511         }
512 
513         return scheduledTask.delayNanos(currentTimeNanos);
514     }
515 
516     
517 
518 
519 
520     protected long deadlineNanos() {
521         ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
522         if (scheduledTask == null) {
523             return getCurrentTimeNanos() + SCHEDULE_PURGE_INTERVAL;
524         }
525         return scheduledTask.deadlineNanos();
526     }
527 
528     
529 
530 
531 
532 
533 
534 
535     protected void updateLastExecutionTime() {
536         lastExecutionTime = getCurrentTimeNanos();
537     }
538 
539     
540 
541 
542     protected abstract void run();
543 
544     
545 
546 
547     protected void cleanup() {
548         
549     }
550 
551     protected void wakeup(boolean inEventLoop) {
552         if (!inEventLoop) {
553             
554             
555             taskQueue.offer(WAKEUP_TASK);
556         }
557     }
558 
559     @Override
560     public boolean inEventLoop(Thread thread) {
561         return thread == this.thread;
562     }
563 
564     
565 
566 
567     public void addShutdownHook(final Runnable task) {
568         if (inEventLoop()) {
569             shutdownHooks.add(task);
570         } else {
571             execute(new Runnable() {
572                 @Override
573                 public void run() {
574                     shutdownHooks.add(task);
575                 }
576             });
577         }
578     }
579 
580     
581 
582 
583     public void removeShutdownHook(final Runnable task) {
584         if (inEventLoop()) {
585             shutdownHooks.remove(task);
586         } else {
587             execute(new Runnable() {
588                 @Override
589                 public void run() {
590                     shutdownHooks.remove(task);
591                 }
592             });
593         }
594     }
595 
596     private boolean runShutdownHooks() {
597         boolean ran = false;
598         
599         while (!shutdownHooks.isEmpty()) {
600             List<Runnable> copy = new ArrayList<Runnable>(shutdownHooks);
601             shutdownHooks.clear();
602             for (Runnable task: copy) {
603                 try {
604                     runTask(task);
605                 } catch (Throwable t) {
606                     logger.warn("Shutdown hook raised an exception.", t);
607                 } finally {
608                     ran = true;
609                 }
610             }
611         }
612 
613         if (ran) {
614             lastExecutionTime = getCurrentTimeNanos();
615         }
616 
617         return ran;
618     }
619 
620     @Override
621     public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
622         ObjectUtil.checkPositiveOrZero(quietPeriod, "quietPeriod");
623         if (timeout < quietPeriod) {
624             throw new IllegalArgumentException(
625                     "timeout: " + timeout + " (expected >= quietPeriod (" + quietPeriod + "))");
626         }
627         ObjectUtil.checkNotNull(unit, "unit");
628 
629         if (isShuttingDown()) {
630             return terminationFuture();
631         }
632 
633         boolean inEventLoop = inEventLoop();
634         boolean wakeup;
635         int oldState;
636         for (;;) {
637             if (isShuttingDown()) {
638                 return terminationFuture();
639             }
640             int newState;
641             wakeup = true;
642             oldState = state;
643             if (inEventLoop) {
644                 newState = ST_SHUTTING_DOWN;
645             } else {
646                 switch (oldState) {
647                     case ST_NOT_STARTED:
648                     case ST_STARTED:
649                         newState = ST_SHUTTING_DOWN;
650                         break;
651                     default:
652                         newState = oldState;
653                         wakeup = false;
654                 }
655             }
656             if (STATE_UPDATER.compareAndSet(this, oldState, newState)) {
657                 break;
658             }
659         }
660         gracefulShutdownQuietPeriod = unit.toNanos(quietPeriod);
661         gracefulShutdownTimeout = unit.toNanos(timeout);
662 
663         if (ensureThreadStarted(oldState)) {
664             return terminationFuture;
665         }
666 
667         if (wakeup) {
668             taskQueue.offer(WAKEUP_TASK);
669             if (!addTaskWakesUp) {
670                 wakeup(inEventLoop);
671             }
672         }
673 
674         return terminationFuture();
675     }
676 
677     @Override
678     public Future<?> terminationFuture() {
679         return terminationFuture;
680     }
681 
682     @Override
683     @Deprecated
684     public void shutdown() {
685         if (isShutdown()) {
686             return;
687         }
688 
689         boolean inEventLoop = inEventLoop();
690         boolean wakeup;
691         int oldState;
692         for (;;) {
693             if (isShuttingDown()) {
694                 return;
695             }
696             int newState;
697             wakeup = true;
698             oldState = state;
699             if (inEventLoop) {
700                 newState = ST_SHUTDOWN;
701             } else {
702                 switch (oldState) {
703                     case ST_NOT_STARTED:
704                     case ST_STARTED:
705                     case ST_SHUTTING_DOWN:
706                         newState = ST_SHUTDOWN;
707                         break;
708                     default:
709                         newState = oldState;
710                         wakeup = false;
711                 }
712             }
713             if (STATE_UPDATER.compareAndSet(this, oldState, newState)) {
714                 break;
715             }
716         }
717 
718         if (ensureThreadStarted(oldState)) {
719             return;
720         }
721 
722         if (wakeup) {
723             taskQueue.offer(WAKEUP_TASK);
724             if (!addTaskWakesUp) {
725                 wakeup(inEventLoop);
726             }
727         }
728     }
729 
730     @Override
731     public boolean isShuttingDown() {
732         return state >= ST_SHUTTING_DOWN;
733     }
734 
735     @Override
736     public boolean isShutdown() {
737         return state >= ST_SHUTDOWN;
738     }
739 
740     @Override
741     public boolean isTerminated() {
742         return state == ST_TERMINATED;
743     }
744 
745     
746 
747 
748     protected boolean confirmShutdown() {
749         if (!isShuttingDown()) {
750             return false;
751         }
752 
753         if (!inEventLoop()) {
754             throw new IllegalStateException("must be invoked from an event loop");
755         }
756 
757         cancelScheduledTasks();
758 
759         if (gracefulShutdownStartTime == 0) {
760             gracefulShutdownStartTime = getCurrentTimeNanos();
761         }
762 
763         if (runAllTasks() || runShutdownHooks()) {
764             if (isShutdown()) {
765                 
766                 return true;
767             }
768 
769             
770             
771             
772             if (gracefulShutdownQuietPeriod == 0) {
773                 return true;
774             }
775             taskQueue.offer(WAKEUP_TASK);
776             return false;
777         }
778 
779         final long nanoTime = getCurrentTimeNanos();
780 
781         if (isShutdown() || nanoTime - gracefulShutdownStartTime > gracefulShutdownTimeout) {
782             return true;
783         }
784 
785         if (nanoTime - lastExecutionTime <= gracefulShutdownQuietPeriod) {
786             
787             
788             taskQueue.offer(WAKEUP_TASK);
789             try {
790                 Thread.sleep(100);
791             } catch (InterruptedException e) {
792                 
793             }
794 
795             return false;
796         }
797 
798         
799         
800         return true;
801     }
802 
803     @Override
804     public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
805         ObjectUtil.checkNotNull(unit, "unit");
806         if (inEventLoop()) {
807             throw new IllegalStateException("cannot await termination of the current thread");
808         }
809 
810         threadLock.await(timeout, unit);
811 
812         return isTerminated();
813     }
814 
815     @Override
816     public void execute(Runnable task) {
817         execute0(task);
818     }
819 
820     @Override
821     public void lazyExecute(Runnable task) {
822         lazyExecute0(task);
823     }
824 
825     private void execute0(@Schedule Runnable task) {
826         ObjectUtil.checkNotNull(task, "task");
827         execute(task, wakesUpForTask(task));
828     }
829 
830     private void lazyExecute0(@Schedule Runnable task) {
831         execute(ObjectUtil.checkNotNull(task, "task"), false);
832     }
833 
834     private void execute(Runnable task, boolean immediate) {
835         boolean inEventLoop = inEventLoop();
836         addTask(task);
837         if (!inEventLoop) {
838             startThread();
839             if (isShutdown()) {
840                 boolean reject = false;
841                 try {
842                     if (removeTask(task)) {
843                         reject = true;
844                     }
845                 } catch (UnsupportedOperationException e) {
846                     
847                     
848                     
849                 }
850                 if (reject) {
851                     reject();
852                 }
853             }
854         }
855 
856         if (!addTaskWakesUp && immediate) {
857             wakeup(inEventLoop);
858         }
859     }
860 
861     @Override
862     public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
863         throwIfInEventLoop("invokeAny");
864         return super.invokeAny(tasks);
865     }
866 
867     @Override
868     public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
869             throws InterruptedException, ExecutionException, TimeoutException {
870         throwIfInEventLoop("invokeAny");
871         return super.invokeAny(tasks, timeout, unit);
872     }
873 
874     @Override
875     public <T> List<java.util.concurrent.Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
876             throws InterruptedException {
877         throwIfInEventLoop("invokeAll");
878         return super.invokeAll(tasks);
879     }
880 
881     @Override
882     public <T> List<java.util.concurrent.Future<T>> invokeAll(
883             Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
884         throwIfInEventLoop("invokeAll");
885         return super.invokeAll(tasks, timeout, unit);
886     }
887 
888     private void throwIfInEventLoop(String method) {
889         if (inEventLoop()) {
890             throw new RejectedExecutionException("Calling " + method + " from within the EventLoop is not allowed");
891         }
892     }
893 
894     
895 
896 
897 
898 
899     public final ThreadProperties threadProperties() {
900         ThreadProperties threadProperties = this.threadProperties;
901         if (threadProperties == null) {
902             Thread thread = this.thread;
903             if (thread == null) {
904                 assert !inEventLoop();
905                 submit(NOOP_TASK).syncUninterruptibly();
906                 thread = this.thread;
907                 assert thread != null;
908             }
909 
910             threadProperties = new DefaultThreadProperties(thread);
911             if (!PROPERTIES_UPDATER.compareAndSet(this, null, threadProperties)) {
912                 threadProperties = this.threadProperties;
913             }
914         }
915 
916         return threadProperties;
917     }
918 
919     
920 
921 
922     @Deprecated
923     protected interface NonWakeupRunnable extends LazyRunnable { }
924 
925     
926 
927 
928 
929     protected boolean wakesUpForTask(Runnable task) {
930         return true;
931     }
932 
933     protected static void reject() {
934         throw new RejectedExecutionException("event executor terminated");
935     }
936 
937     
938 
939 
940 
941 
942     protected final void reject(Runnable task) {
943         rejectedExecutionHandler.rejected(task, this);
944     }
945 
946     
947 
948     private static final long SCHEDULE_PURGE_INTERVAL = TimeUnit.SECONDS.toNanos(1);
949 
950     private void startThread() {
951         if (state == ST_NOT_STARTED) {
952             if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
953                 boolean success = false;
954                 try {
955                     doStartThread();
956                     success = true;
957                 } finally {
958                     if (!success) {
959                         STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
960                     }
961                 }
962             }
963         }
964     }
965 
966     private boolean ensureThreadStarted(int oldState) {
967         if (oldState == ST_NOT_STARTED) {
968             try {
969                 doStartThread();
970             } catch (Throwable cause) {
971                 STATE_UPDATER.set(this, ST_TERMINATED);
972                 terminationFuture.tryFailure(cause);
973 
974                 if (!(cause instanceof Exception)) {
975                     
976                     PlatformDependent.throwException(cause);
977                 }
978                 return true;
979             }
980         }
981         return false;
982     }
983 
984     private void doStartThread() {
985         assert thread == null;
986         executor.execute(new Runnable() {
987             @Override
988             public void run() {
989                 thread = Thread.currentThread();
990                 if (interrupted) {
991                     thread.interrupt();
992                 }
993 
994                 boolean success = false;
995                 Throwable unexpectedException = null;
996                 updateLastExecutionTime();
997                 try {
998                     SingleThreadEventExecutor.this.run();
999                     success = true;
1000                 } catch (Throwable t) {
1001                     unexpectedException = t;
1002                     logger.warn("Unexpected exception from an event executor: ", t);
1003                 } finally {
1004                     for (;;) {
1005                         int oldState = state;
1006                         if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
1007                                 SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
1008                             break;
1009                         }
1010                     }
1011 
1012                     
1013                     if (success && gracefulShutdownStartTime == 0) {
1014                         if (logger.isErrorEnabled()) {
1015                             logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
1016                                     SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must " +
1017                                     "be called before run() implementation terminates.");
1018                         }
1019                     }
1020 
1021                     try {
1022                         
1023                         
1024                         
1025                         for (;;) {
1026                             if (confirmShutdown()) {
1027                                 break;
1028                             }
1029                         }
1030 
1031                         
1032                         
1033                         for (;;) {
1034                             int oldState = state;
1035                             if (oldState >= ST_SHUTDOWN || STATE_UPDATER.compareAndSet(
1036                                     SingleThreadEventExecutor.this, oldState, ST_SHUTDOWN)) {
1037                                 break;
1038                             }
1039                         }
1040 
1041                         
1042                         
1043                         confirmShutdown();
1044                     } finally {
1045                         try {
1046                             cleanup();
1047                         } finally {
1048                             
1049                             
1050                             
1051                             
1052                             FastThreadLocal.removeAll();
1053 
1054                             STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
1055                             threadLock.countDown();
1056                             int numUserTasks = drainTasks();
1057                             if (numUserTasks > 0 && logger.isWarnEnabled()) {
1058                                 logger.warn("An event executor terminated with " +
1059                                         "non-empty task queue (" + numUserTasks + ')');
1060                             }
1061                             if (unexpectedException == null) {
1062                                 terminationFuture.setSuccess(null);
1063                             } else {
1064                                 terminationFuture.setFailure(unexpectedException);
1065                             }
1066                         }
1067                     }
1068                 }
1069             }
1070         });
1071     }
1072 
1073     final int drainTasks() {
1074         int numTasks = 0;
1075         for (;;) {
1076             Runnable runnable = taskQueue.poll();
1077             if (runnable == null) {
1078                 break;
1079             }
1080             
1081             
1082             if (WAKEUP_TASK != runnable) {
1083                 numTasks++;
1084             }
1085         }
1086         return numTasks;
1087     }
1088 
1089     private static final class DefaultThreadProperties implements ThreadProperties {
1090         private final Thread t;
1091 
1092         DefaultThreadProperties(Thread t) {
1093             this.t = t;
1094         }
1095 
1096         @Override
1097         public State state() {
1098             return t.getState();
1099         }
1100 
1101         @Override
1102         public int priority() {
1103             return t.getPriority();
1104         }
1105 
1106         @Override
1107         public boolean isInterrupted() {
1108             return t.isInterrupted();
1109         }
1110 
1111         @Override
1112         public boolean isDaemon() {
1113             return t.isDaemon();
1114         }
1115 
1116         @Override
1117         public String name() {
1118             return t.getName();
1119         }
1120 
1121         @Override
1122         public long id() {
1123             return t.getId();
1124         }
1125 
1126         @Override
1127         public StackTraceElement[] stackTrace() {
1128             return t.getStackTrace();
1129         }
1130 
1131         @Override
1132         public boolean isAlive() {
1133             return t.isAlive();
1134         }
1135     }
1136 }