1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.util.concurrent;
17
18 import io.netty.util.internal.ObjectUtil;
19 import io.netty.util.internal.PlatformDependent;
20 import io.netty.util.internal.SystemPropertyUtil;
21 import io.netty.util.internal.ThreadExecutorMap;
22 import io.netty.util.internal.logging.InternalLogger;
23 import io.netty.util.internal.logging.InternalLoggerFactory;
24 import org.jetbrains.annotations.Async.Schedule;
25
26 import java.lang.Thread.State;
27 import java.util.ArrayList;
28 import java.util.Collection;
29 import java.util.LinkedHashSet;
30 import java.util.List;
31 import java.util.Queue;
32 import java.util.Set;
33 import java.util.concurrent.BlockingQueue;
34 import java.util.concurrent.Callable;
35 import java.util.concurrent.CountDownLatch;
36 import java.util.concurrent.ExecutionException;
37 import java.util.concurrent.Executor;
38 import java.util.concurrent.LinkedBlockingQueue;
39 import java.util.concurrent.RejectedExecutionException;
40 import java.util.concurrent.ThreadFactory;
41 import java.util.concurrent.TimeUnit;
42 import java.util.concurrent.TimeoutException;
43 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
44 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
45
46
47
48
49
50 public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
51
52 static final int DEFAULT_MAX_PENDING_EXECUTOR_TASKS = Math.max(16,
53 SystemPropertyUtil.getInt("io.netty.eventexecutor.maxPendingTasks", Integer.MAX_VALUE));
54
55 private static final InternalLogger logger =
56 InternalLoggerFactory.getInstance(SingleThreadEventExecutor.class);
57
58 private static final int ST_NOT_STARTED = 1;
59 private static final int ST_STARTED = 2;
60 private static final int ST_SHUTTING_DOWN = 3;
61 private static final int ST_SHUTDOWN = 4;
62 private static final int ST_TERMINATED = 5;
63
64 private static final Runnable NOOP_TASK = new Runnable() {
65 @Override
66 public void run() {
67
68 }
69 };
70
71 private static final AtomicIntegerFieldUpdater<SingleThreadEventExecutor> STATE_UPDATER =
72 AtomicIntegerFieldUpdater.newUpdater(SingleThreadEventExecutor.class, "state");
73 private static final AtomicReferenceFieldUpdater<SingleThreadEventExecutor, ThreadProperties> PROPERTIES_UPDATER =
74 AtomicReferenceFieldUpdater.newUpdater(
75 SingleThreadEventExecutor.class, ThreadProperties.class, "threadProperties");
76
77 private final Queue<Runnable> taskQueue;
78
79 private volatile Thread thread;
80 @SuppressWarnings("unused")
81 private volatile ThreadProperties threadProperties;
82 private final Executor executor;
83 private volatile boolean interrupted;
84
85 private final CountDownLatch threadLock = new CountDownLatch(1);
86 private final Set<Runnable> shutdownHooks = new LinkedHashSet<Runnable>();
87 private final boolean addTaskWakesUp;
88 private final int maxPendingTasks;
89 private final RejectedExecutionHandler rejectedExecutionHandler;
90
91 private long lastExecutionTime;
92
93 @SuppressWarnings({ "FieldMayBeFinal", "unused" })
94 private volatile int state = ST_NOT_STARTED;
95
96 private volatile long gracefulShutdownQuietPeriod;
97 private volatile long gracefulShutdownTimeout;
98 private long gracefulShutdownStartTime;
99
100 private final Promise<?> terminationFuture = new DefaultPromise<Void>(GlobalEventExecutor.INSTANCE);
101
102
103
104
105
106
107
108
109
110 protected SingleThreadEventExecutor(
111 EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
112 this(parent, new ThreadPerTaskExecutor(threadFactory), addTaskWakesUp);
113 }
114
115
116
117
118
119
120
121
122
123
124
125 protected SingleThreadEventExecutor(
126 EventExecutorGroup parent, ThreadFactory threadFactory,
127 boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedHandler) {
128 this(parent, new ThreadPerTaskExecutor(threadFactory), addTaskWakesUp, maxPendingTasks, rejectedHandler);
129 }
130
131
132
133
134
135
136
137
138
139 protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp) {
140 this(parent, executor, addTaskWakesUp, DEFAULT_MAX_PENDING_EXECUTOR_TASKS, RejectedExecutionHandlers.reject());
141 }
142
143
144
145
146
147
148
149
150
151
152
153 protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
154 boolean addTaskWakesUp, int maxPendingTasks,
155 RejectedExecutionHandler rejectedHandler) {
156 super(parent);
157 this.addTaskWakesUp = addTaskWakesUp;
158 this.maxPendingTasks = Math.max(16, maxPendingTasks);
159 this.executor = ThreadExecutorMap.apply(executor, this);
160 taskQueue = newTaskQueue(this.maxPendingTasks);
161 rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
162 }
163
164 protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
165 boolean addTaskWakesUp, Queue<Runnable> taskQueue,
166 RejectedExecutionHandler rejectedHandler) {
167 super(parent);
168 this.addTaskWakesUp = addTaskWakesUp;
169 this.maxPendingTasks = DEFAULT_MAX_PENDING_EXECUTOR_TASKS;
170 this.executor = ThreadExecutorMap.apply(executor, this);
171 this.taskQueue = ObjectUtil.checkNotNull(taskQueue, "taskQueue");
172 this.rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
173 }
174
175
176
177
178 @Deprecated
179 protected Queue<Runnable> newTaskQueue() {
180 return newTaskQueue(maxPendingTasks);
181 }
182
183
184
185
186
187
188
189 protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
190 return new LinkedBlockingQueue<Runnable>(maxPendingTasks);
191 }
192
193
194
195
196 protected void interruptThread() {
197 Thread currentThread = thread;
198 if (currentThread == null) {
199 interrupted = true;
200 } else {
201 currentThread.interrupt();
202 }
203 }
204
205
206
207
208 protected Runnable pollTask() {
209 assert inEventLoop();
210 return pollTaskFrom(taskQueue);
211 }
212
213 protected static Runnable pollTaskFrom(Queue<Runnable> taskQueue) {
214 for (;;) {
215 Runnable task = taskQueue.poll();
216 if (task != WAKEUP_TASK) {
217 return task;
218 }
219 }
220 }
221
222
223
224
225
226
227
228
229
230
231 protected Runnable takeTask() {
232 assert inEventLoop();
233 if (!(taskQueue instanceof BlockingQueue)) {
234 throw new UnsupportedOperationException();
235 }
236
237 BlockingQueue<Runnable> taskQueue = (BlockingQueue<Runnable>) this.taskQueue;
238 for (;;) {
239 ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
240 if (scheduledTask == null) {
241 Runnable task = null;
242 try {
243 task = taskQueue.take();
244 if (task == WAKEUP_TASK) {
245 task = null;
246 }
247 } catch (InterruptedException e) {
248
249 }
250 return task;
251 } else {
252 long delayNanos = scheduledTask.delayNanos();
253 Runnable task = null;
254 if (delayNanos > 0) {
255 try {
256 task = taskQueue.poll(delayNanos, TimeUnit.NANOSECONDS);
257 } catch (InterruptedException e) {
258
259 return null;
260 }
261 }
262 if (task == null) {
263
264
265
266
267 fetchFromScheduledTaskQueue();
268 task = taskQueue.poll();
269 }
270
271 if (task != null) {
272 return task;
273 }
274 }
275 }
276 }
277
278 private boolean fetchFromScheduledTaskQueue() {
279 if (scheduledTaskQueue == null || scheduledTaskQueue.isEmpty()) {
280 return true;
281 }
282 long nanoTime = getCurrentTimeNanos();
283 for (;;) {
284 Runnable scheduledTask = pollScheduledTask(nanoTime);
285 if (scheduledTask == null) {
286 return true;
287 }
288 if (!taskQueue.offer(scheduledTask)) {
289
290 scheduledTaskQueue.add((ScheduledFutureTask<?>) scheduledTask);
291 return false;
292 }
293 }
294 }
295
296
297
298
299 private boolean executeExpiredScheduledTasks() {
300 if (scheduledTaskQueue == null || scheduledTaskQueue.isEmpty()) {
301 return false;
302 }
303 long nanoTime = getCurrentTimeNanos();
304 Runnable scheduledTask = pollScheduledTask(nanoTime);
305 if (scheduledTask == null) {
306 return false;
307 }
308 do {
309 safeExecute(scheduledTask);
310 } while ((scheduledTask = pollScheduledTask(nanoTime)) != null);
311 return true;
312 }
313
314
315
316
317 protected Runnable peekTask() {
318 assert inEventLoop();
319 return taskQueue.peek();
320 }
321
322
323
324
325 protected boolean hasTasks() {
326 assert inEventLoop();
327 return !taskQueue.isEmpty();
328 }
329
330
331
332
333 public int pendingTasks() {
334 return taskQueue.size();
335 }
336
337
338
339
340
341 protected void addTask(Runnable task) {
342 ObjectUtil.checkNotNull(task, "task");
343 if (!offerTask(task)) {
344 reject(task);
345 }
346 }
347
348 final boolean offerTask(Runnable task) {
349 if (isShutdown()) {
350 reject();
351 }
352 return taskQueue.offer(task);
353 }
354
355
356
357
358 protected boolean removeTask(Runnable task) {
359 return taskQueue.remove(ObjectUtil.checkNotNull(task, "task"));
360 }
361
362
363
364
365
366
367 protected boolean runAllTasks() {
368 assert inEventLoop();
369 boolean fetchedAll;
370 boolean ranAtLeastOne = false;
371
372 do {
373 fetchedAll = fetchFromScheduledTaskQueue();
374 if (runAllTasksFrom(taskQueue)) {
375 ranAtLeastOne = true;
376 }
377 } while (!fetchedAll);
378
379 if (ranAtLeastOne) {
380 lastExecutionTime = getCurrentTimeNanos();
381 }
382 afterRunningAllTasks();
383 return ranAtLeastOne;
384 }
385
386
387
388
389
390
391
392
393
394 protected final boolean runScheduledAndExecutorTasks(final int maxDrainAttempts) {
395 assert inEventLoop();
396 boolean ranAtLeastOneTask;
397 int drainAttempt = 0;
398 do {
399
400
401 ranAtLeastOneTask = runExistingTasksFrom(taskQueue) | executeExpiredScheduledTasks();
402 } while (ranAtLeastOneTask && ++drainAttempt < maxDrainAttempts);
403
404 if (drainAttempt > 0) {
405 lastExecutionTime = getCurrentTimeNanos();
406 }
407 afterRunningAllTasks();
408
409 return drainAttempt > 0;
410 }
411
412
413
414
415
416
417
418
419 protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) {
420 Runnable task = pollTaskFrom(taskQueue);
421 if (task == null) {
422 return false;
423 }
424 for (;;) {
425 safeExecute(task);
426 task = pollTaskFrom(taskQueue);
427 if (task == null) {
428 return true;
429 }
430 }
431 }
432
433
434
435
436
437
438 private boolean runExistingTasksFrom(Queue<Runnable> taskQueue) {
439 Runnable task = pollTaskFrom(taskQueue);
440 if (task == null) {
441 return false;
442 }
443 int remaining = Math.min(maxPendingTasks, taskQueue.size());
444 safeExecute(task);
445
446
447 while (remaining-- > 0 && (task = taskQueue.poll()) != null) {
448 safeExecute(task);
449 }
450 return true;
451 }
452
453
454
455
456
457 protected boolean runAllTasks(long timeoutNanos) {
458 fetchFromScheduledTaskQueue();
459 Runnable task = pollTask();
460 if (task == null) {
461 afterRunningAllTasks();
462 return false;
463 }
464
465 final long deadline = timeoutNanos > 0 ? getCurrentTimeNanos() + timeoutNanos : 0;
466 long runTasks = 0;
467 long lastExecutionTime;
468 for (;;) {
469 safeExecute(task);
470
471 runTasks ++;
472
473
474
475 if ((runTasks & 0x3F) == 0) {
476 lastExecutionTime = getCurrentTimeNanos();
477 if (lastExecutionTime >= deadline) {
478 break;
479 }
480 }
481
482 task = pollTask();
483 if (task == null) {
484 lastExecutionTime = getCurrentTimeNanos();
485 break;
486 }
487 }
488
489 afterRunningAllTasks();
490 this.lastExecutionTime = lastExecutionTime;
491 return true;
492 }
493
494
495
496
497 protected void afterRunningAllTasks() { }
498
499
500
501
502 protected long delayNanos(long currentTimeNanos) {
503 currentTimeNanos -= initialNanoTime();
504
505 ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
506 if (scheduledTask == null) {
507 return SCHEDULE_PURGE_INTERVAL;
508 }
509
510 return scheduledTask.delayNanos(currentTimeNanos);
511 }
512
513
514
515
516
517 protected long deadlineNanos() {
518 ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
519 if (scheduledTask == null) {
520 return getCurrentTimeNanos() + SCHEDULE_PURGE_INTERVAL;
521 }
522 return scheduledTask.deadlineNanos();
523 }
524
525
526
527
528
529
530
531
532 protected void updateLastExecutionTime() {
533 lastExecutionTime = getCurrentTimeNanos();
534 }
535
536
537
538
539 protected abstract void run();
540
541
542
543
544 protected void cleanup() {
545
546 }
547
548 protected void wakeup(boolean inEventLoop) {
549 if (!inEventLoop) {
550
551
552 taskQueue.offer(WAKEUP_TASK);
553 }
554 }
555
556 @Override
557 public boolean inEventLoop(Thread thread) {
558 return thread == this.thread;
559 }
560
561
562
563
564 public void addShutdownHook(final Runnable task) {
565 if (inEventLoop()) {
566 shutdownHooks.add(task);
567 } else {
568 execute(new Runnable() {
569 @Override
570 public void run() {
571 shutdownHooks.add(task);
572 }
573 });
574 }
575 }
576
577
578
579
580 public void removeShutdownHook(final Runnable task) {
581 if (inEventLoop()) {
582 shutdownHooks.remove(task);
583 } else {
584 execute(new Runnable() {
585 @Override
586 public void run() {
587 shutdownHooks.remove(task);
588 }
589 });
590 }
591 }
592
593 private boolean runShutdownHooks() {
594 boolean ran = false;
595
596 while (!shutdownHooks.isEmpty()) {
597 List<Runnable> copy = new ArrayList<Runnable>(shutdownHooks);
598 shutdownHooks.clear();
599 for (Runnable task: copy) {
600 try {
601 runTask(task);
602 } catch (Throwable t) {
603 logger.warn("Shutdown hook raised an exception.", t);
604 } finally {
605 ran = true;
606 }
607 }
608 }
609
610 if (ran) {
611 lastExecutionTime = getCurrentTimeNanos();
612 }
613
614 return ran;
615 }
616
617 @Override
618 public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
619 ObjectUtil.checkPositiveOrZero(quietPeriod, "quietPeriod");
620 if (timeout < quietPeriod) {
621 throw new IllegalArgumentException(
622 "timeout: " + timeout + " (expected >= quietPeriod (" + quietPeriod + "))");
623 }
624 ObjectUtil.checkNotNull(unit, "unit");
625
626 if (isShuttingDown()) {
627 return terminationFuture();
628 }
629
630 boolean inEventLoop = inEventLoop();
631 boolean wakeup;
632 int oldState;
633 for (;;) {
634 if (isShuttingDown()) {
635 return terminationFuture();
636 }
637 int newState;
638 wakeup = true;
639 oldState = state;
640 if (inEventLoop) {
641 newState = ST_SHUTTING_DOWN;
642 } else {
643 switch (oldState) {
644 case ST_NOT_STARTED:
645 case ST_STARTED:
646 newState = ST_SHUTTING_DOWN;
647 break;
648 default:
649 newState = oldState;
650 wakeup = false;
651 }
652 }
653 if (STATE_UPDATER.compareAndSet(this, oldState, newState)) {
654 break;
655 }
656 }
657 gracefulShutdownQuietPeriod = unit.toNanos(quietPeriod);
658 gracefulShutdownTimeout = unit.toNanos(timeout);
659
660 if (ensureThreadStarted(oldState)) {
661 return terminationFuture;
662 }
663
664 if (wakeup) {
665 taskQueue.offer(WAKEUP_TASK);
666 if (!addTaskWakesUp) {
667 wakeup(inEventLoop);
668 }
669 }
670
671 return terminationFuture();
672 }
673
674 @Override
675 public Future<?> terminationFuture() {
676 return terminationFuture;
677 }
678
679 @Override
680 @Deprecated
681 public void shutdown() {
682 if (isShutdown()) {
683 return;
684 }
685
686 boolean inEventLoop = inEventLoop();
687 boolean wakeup;
688 int oldState;
689 for (;;) {
690 if (isShuttingDown()) {
691 return;
692 }
693 int newState;
694 wakeup = true;
695 oldState = state;
696 if (inEventLoop) {
697 newState = ST_SHUTDOWN;
698 } else {
699 switch (oldState) {
700 case ST_NOT_STARTED:
701 case ST_STARTED:
702 case ST_SHUTTING_DOWN:
703 newState = ST_SHUTDOWN;
704 break;
705 default:
706 newState = oldState;
707 wakeup = false;
708 }
709 }
710 if (STATE_UPDATER.compareAndSet(this, oldState, newState)) {
711 break;
712 }
713 }
714
715 if (ensureThreadStarted(oldState)) {
716 return;
717 }
718
719 if (wakeup) {
720 taskQueue.offer(WAKEUP_TASK);
721 if (!addTaskWakesUp) {
722 wakeup(inEventLoop);
723 }
724 }
725 }
726
727 @Override
728 public boolean isShuttingDown() {
729 return state >= ST_SHUTTING_DOWN;
730 }
731
732 @Override
733 public boolean isShutdown() {
734 return state >= ST_SHUTDOWN;
735 }
736
737 @Override
738 public boolean isTerminated() {
739 return state == ST_TERMINATED;
740 }
741
742
743
744
745 protected boolean confirmShutdown() {
746 if (!isShuttingDown()) {
747 return false;
748 }
749
750 if (!inEventLoop()) {
751 throw new IllegalStateException("must be invoked from an event loop");
752 }
753
754 cancelScheduledTasks();
755
756 if (gracefulShutdownStartTime == 0) {
757 gracefulShutdownStartTime = getCurrentTimeNanos();
758 }
759
760 if (runAllTasks() || runShutdownHooks()) {
761 if (isShutdown()) {
762
763 return true;
764 }
765
766
767
768
769 if (gracefulShutdownQuietPeriod == 0) {
770 return true;
771 }
772 taskQueue.offer(WAKEUP_TASK);
773 return false;
774 }
775
776 final long nanoTime = getCurrentTimeNanos();
777
778 if (isShutdown() || nanoTime - gracefulShutdownStartTime > gracefulShutdownTimeout) {
779 return true;
780 }
781
782 if (nanoTime - lastExecutionTime <= gracefulShutdownQuietPeriod) {
783
784
785 taskQueue.offer(WAKEUP_TASK);
786 try {
787 Thread.sleep(100);
788 } catch (InterruptedException e) {
789
790 }
791
792 return false;
793 }
794
795
796
797 return true;
798 }
799
800 @Override
801 public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
802 ObjectUtil.checkNotNull(unit, "unit");
803 if (inEventLoop()) {
804 throw new IllegalStateException("cannot await termination of the current thread");
805 }
806
807 threadLock.await(timeout, unit);
808
809 return isTerminated();
810 }
811
812 @Override
813 public void execute(Runnable task) {
814 execute0(task);
815 }
816
817 @Override
818 public void lazyExecute(Runnable task) {
819 lazyExecute0(task);
820 }
821
822 private void execute0(@Schedule Runnable task) {
823 ObjectUtil.checkNotNull(task, "task");
824 execute(task, wakesUpForTask(task));
825 }
826
827 private void lazyExecute0(@Schedule Runnable task) {
828 execute(ObjectUtil.checkNotNull(task, "task"), false);
829 }
830
831 private void execute(Runnable task, boolean immediate) {
832 boolean inEventLoop = inEventLoop();
833 addTask(task);
834 if (!inEventLoop) {
835 startThread();
836 if (isShutdown()) {
837 boolean reject = false;
838 try {
839 if (removeTask(task)) {
840 reject = true;
841 }
842 } catch (UnsupportedOperationException e) {
843
844
845
846 }
847 if (reject) {
848 reject();
849 }
850 }
851 }
852
853 if (!addTaskWakesUp && immediate) {
854 wakeup(inEventLoop);
855 }
856 }
857
858 @Override
859 public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
860 throwIfInEventLoop("invokeAny");
861 return super.invokeAny(tasks);
862 }
863
864 @Override
865 public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
866 throws InterruptedException, ExecutionException, TimeoutException {
867 throwIfInEventLoop("invokeAny");
868 return super.invokeAny(tasks, timeout, unit);
869 }
870
871 @Override
872 public <T> List<java.util.concurrent.Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
873 throws InterruptedException {
874 throwIfInEventLoop("invokeAll");
875 return super.invokeAll(tasks);
876 }
877
878 @Override
879 public <T> List<java.util.concurrent.Future<T>> invokeAll(
880 Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
881 throwIfInEventLoop("invokeAll");
882 return super.invokeAll(tasks, timeout, unit);
883 }
884
885 private void throwIfInEventLoop(String method) {
886 if (inEventLoop()) {
887 throw new RejectedExecutionException("Calling " + method + " from within the EventLoop is not allowed");
888 }
889 }
890
891
892
893
894
895
896 public final ThreadProperties threadProperties() {
897 ThreadProperties threadProperties = this.threadProperties;
898 if (threadProperties == null) {
899 Thread thread = this.thread;
900 if (thread == null) {
901 assert !inEventLoop();
902 submit(NOOP_TASK).syncUninterruptibly();
903 thread = this.thread;
904 assert thread != null;
905 }
906
907 threadProperties = new DefaultThreadProperties(thread);
908 if (!PROPERTIES_UPDATER.compareAndSet(this, null, threadProperties)) {
909 threadProperties = this.threadProperties;
910 }
911 }
912
913 return threadProperties;
914 }
915
916
917
918
919 @Deprecated
920 protected interface NonWakeupRunnable extends LazyRunnable { }
921
922
923
924
925
926 protected boolean wakesUpForTask(Runnable task) {
927 return true;
928 }
929
930 protected static void reject() {
931 throw new RejectedExecutionException("event executor terminated");
932 }
933
934
935
936
937
938
939 protected final void reject(Runnable task) {
940 rejectedExecutionHandler.rejected(task, this);
941 }
942
943
944
945 private static final long SCHEDULE_PURGE_INTERVAL = TimeUnit.SECONDS.toNanos(1);
946
947 private void startThread() {
948 if (state == ST_NOT_STARTED) {
949 if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
950 boolean success = false;
951 try {
952 doStartThread();
953 success = true;
954 } finally {
955 if (!success) {
956 STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
957 }
958 }
959 }
960 }
961 }
962
963 private boolean ensureThreadStarted(int oldState) {
964 if (oldState == ST_NOT_STARTED) {
965 try {
966 doStartThread();
967 } catch (Throwable cause) {
968 STATE_UPDATER.set(this, ST_TERMINATED);
969 terminationFuture.tryFailure(cause);
970
971 if (!(cause instanceof Exception)) {
972
973 PlatformDependent.throwException(cause);
974 }
975 return true;
976 }
977 }
978 return false;
979 }
980
981 private void doStartThread() {
982 assert thread == null;
983 executor.execute(new Runnable() {
984 @Override
985 public void run() {
986 thread = Thread.currentThread();
987 if (interrupted) {
988 thread.interrupt();
989 }
990
991 boolean success = false;
992 updateLastExecutionTime();
993 try {
994 SingleThreadEventExecutor.this.run();
995 success = true;
996 } catch (Throwable t) {
997 logger.warn("Unexpected exception from an event executor: ", t);
998 } finally {
999 for (;;) {
1000 int oldState = state;
1001 if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
1002 SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
1003 break;
1004 }
1005 }
1006
1007
1008 if (success && gracefulShutdownStartTime == 0) {
1009 if (logger.isErrorEnabled()) {
1010 logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
1011 SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must " +
1012 "be called before run() implementation terminates.");
1013 }
1014 }
1015
1016 try {
1017
1018
1019
1020 for (;;) {
1021 if (confirmShutdown()) {
1022 break;
1023 }
1024 }
1025
1026
1027
1028 for (;;) {
1029 int oldState = state;
1030 if (oldState >= ST_SHUTDOWN || STATE_UPDATER.compareAndSet(
1031 SingleThreadEventExecutor.this, oldState, ST_SHUTDOWN)) {
1032 break;
1033 }
1034 }
1035
1036
1037
1038 confirmShutdown();
1039 } finally {
1040 try {
1041 cleanup();
1042 } finally {
1043
1044
1045
1046
1047 FastThreadLocal.removeAll();
1048
1049 STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
1050 threadLock.countDown();
1051 int numUserTasks = drainTasks();
1052 if (numUserTasks > 0 && logger.isWarnEnabled()) {
1053 logger.warn("An event executor terminated with " +
1054 "non-empty task queue (" + numUserTasks + ')');
1055 }
1056 terminationFuture.setSuccess(null);
1057 }
1058 }
1059 }
1060 }
1061 });
1062 }
1063
1064 final int drainTasks() {
1065 int numTasks = 0;
1066 for (;;) {
1067 Runnable runnable = taskQueue.poll();
1068 if (runnable == null) {
1069 break;
1070 }
1071
1072
1073 if (WAKEUP_TASK != runnable) {
1074 numTasks++;
1075 }
1076 }
1077 return numTasks;
1078 }
1079
1080 private static final class DefaultThreadProperties implements ThreadProperties {
1081 private final Thread t;
1082
1083 DefaultThreadProperties(Thread t) {
1084 this.t = t;
1085 }
1086
1087 @Override
1088 public State state() {
1089 return t.getState();
1090 }
1091
1092 @Override
1093 public int priority() {
1094 return t.getPriority();
1095 }
1096
1097 @Override
1098 public boolean isInterrupted() {
1099 return t.isInterrupted();
1100 }
1101
1102 @Override
1103 public boolean isDaemon() {
1104 return t.isDaemon();
1105 }
1106
1107 @Override
1108 public String name() {
1109 return t.getName();
1110 }
1111
1112 @Override
1113 public long id() {
1114 return t.getId();
1115 }
1116
1117 @Override
1118 public StackTraceElement[] stackTrace() {
1119 return t.getStackTrace();
1120 }
1121
1122 @Override
1123 public boolean isAlive() {
1124 return t.isAlive();
1125 }
1126 }
1127 }