1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.util.concurrent;
17
18 import io.netty.util.internal.ObjectUtil;
19 import io.netty.util.internal.PlatformDependent;
20 import io.netty.util.internal.SystemPropertyUtil;
21 import io.netty.util.internal.ThreadExecutorMap;
22 import io.netty.util.internal.logging.InternalLogger;
23 import io.netty.util.internal.logging.InternalLoggerFactory;
24 import org.jetbrains.annotations.Async.Schedule;
25
26 import java.lang.Thread.State;
27 import java.util.ArrayList;
28 import java.util.Collection;
29 import java.util.LinkedHashSet;
30 import java.util.List;
31 import java.util.Queue;
32 import java.util.Set;
33 import java.util.concurrent.BlockingQueue;
34 import java.util.concurrent.Callable;
35 import java.util.concurrent.CountDownLatch;
36 import java.util.concurrent.ExecutionException;
37 import java.util.concurrent.Executor;
38 import java.util.concurrent.LinkedBlockingQueue;
39 import java.util.concurrent.RejectedExecutionException;
40 import java.util.concurrent.ThreadFactory;
41 import java.util.concurrent.TimeUnit;
42 import java.util.concurrent.TimeoutException;
43 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
44 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
45
46
47
48
49
50 public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
51
52 static final int DEFAULT_MAX_PENDING_EXECUTOR_TASKS = Math.max(16,
53 SystemPropertyUtil.getInt("io.netty.eventexecutor.maxPendingTasks", Integer.MAX_VALUE));
54
55 private static final InternalLogger logger =
56 InternalLoggerFactory.getInstance(SingleThreadEventExecutor.class);
57
58 private static final int ST_NOT_STARTED = 1;
59 private static final int ST_STARTED = 2;
60 private static final int ST_SHUTTING_DOWN = 3;
61 private static final int ST_SHUTDOWN = 4;
62 private static final int ST_TERMINATED = 5;
63
64 private static final Runnable NOOP_TASK = new Runnable() {
65 @Override
66 public void run() {
67
68 }
69 };
70
71 private static final AtomicIntegerFieldUpdater<SingleThreadEventExecutor> STATE_UPDATER =
72 AtomicIntegerFieldUpdater.newUpdater(SingleThreadEventExecutor.class, "state");
73 private static final AtomicReferenceFieldUpdater<SingleThreadEventExecutor, ThreadProperties> PROPERTIES_UPDATER =
74 AtomicReferenceFieldUpdater.newUpdater(
75 SingleThreadEventExecutor.class, ThreadProperties.class, "threadProperties");
76
77 private final Queue<Runnable> taskQueue;
78
79 private volatile Thread thread;
80 @SuppressWarnings("unused")
81 private volatile ThreadProperties threadProperties;
82 private final Executor executor;
83 private volatile boolean interrupted;
84
85 private final CountDownLatch threadLock = new CountDownLatch(1);
86 private final Set<Runnable> shutdownHooks = new LinkedHashSet<Runnable>();
87 private final boolean addTaskWakesUp;
88 private final int maxPendingTasks;
89 private final RejectedExecutionHandler rejectedExecutionHandler;
90
91 private long lastExecutionTime;
92
93 @SuppressWarnings({ "FieldMayBeFinal", "unused" })
94 private volatile int state = ST_NOT_STARTED;
95
96 private volatile long gracefulShutdownQuietPeriod;
97 private volatile long gracefulShutdownTimeout;
98 private long gracefulShutdownStartTime;
99
100 private final Promise<?> terminationFuture = new DefaultPromise<Void>(GlobalEventExecutor.INSTANCE);
101
102
103
104
105
106
107
108
109
110 protected SingleThreadEventExecutor(
111 EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
112 this(parent, new ThreadPerTaskExecutor(threadFactory), addTaskWakesUp);
113 }
114
115
116
117
118
119
120
121
122
123
124
125 protected SingleThreadEventExecutor(
126 EventExecutorGroup parent, ThreadFactory threadFactory,
127 boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedHandler) {
128 this(parent, new ThreadPerTaskExecutor(threadFactory), addTaskWakesUp, maxPendingTasks, rejectedHandler);
129 }
130
131
132
133
134
135
136
137
138
139 protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp) {
140 this(parent, executor, addTaskWakesUp, DEFAULT_MAX_PENDING_EXECUTOR_TASKS, RejectedExecutionHandlers.reject());
141 }
142
143
144
145
146
147
148
149
150
151
152
153 protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
154 boolean addTaskWakesUp, int maxPendingTasks,
155 RejectedExecutionHandler rejectedHandler) {
156 super(parent);
157 this.addTaskWakesUp = addTaskWakesUp;
158 this.maxPendingTasks = Math.max(16, maxPendingTasks);
159 this.executor = ThreadExecutorMap.apply(executor, this);
160 taskQueue = newTaskQueue(this.maxPendingTasks);
161 rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
162 }
163
164 protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
165 boolean addTaskWakesUp, Queue<Runnable> taskQueue,
166 RejectedExecutionHandler rejectedHandler) {
167 super(parent);
168 this.addTaskWakesUp = addTaskWakesUp;
169 this.maxPendingTasks = DEFAULT_MAX_PENDING_EXECUTOR_TASKS;
170 this.executor = ThreadExecutorMap.apply(executor, this);
171 this.taskQueue = ObjectUtil.checkNotNull(taskQueue, "taskQueue");
172 this.rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
173 }
174
175
176
177
178 @Deprecated
179 protected Queue<Runnable> newTaskQueue() {
180 return newTaskQueue(maxPendingTasks);
181 }
182
183
184
185
186
187
188
189 protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
190 return new LinkedBlockingQueue<Runnable>(maxPendingTasks);
191 }
192
193
194
195
196 protected void interruptThread() {
197 Thread currentThread = thread;
198 if (currentThread == null) {
199 interrupted = true;
200 } else {
201 currentThread.interrupt();
202 }
203 }
204
205
206
207
208 protected Runnable pollTask() {
209 assert inEventLoop();
210 return pollTaskFrom(taskQueue);
211 }
212
213 protected static Runnable pollTaskFrom(Queue<Runnable> taskQueue) {
214 for (;;) {
215 Runnable task = taskQueue.poll();
216 if (task != WAKEUP_TASK) {
217 return task;
218 }
219 }
220 }
221
222
223
224
225
226
227
228
229
230
231 protected Runnable takeTask() {
232 assert inEventLoop();
233 if (!(taskQueue instanceof BlockingQueue)) {
234 throw new UnsupportedOperationException();
235 }
236
237 BlockingQueue<Runnable> taskQueue = (BlockingQueue<Runnable>) this.taskQueue;
238 for (;;) {
239 ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
240 if (scheduledTask == null) {
241 Runnable task = null;
242 try {
243 task = taskQueue.take();
244 if (task == WAKEUP_TASK) {
245 task = null;
246 }
247 } catch (InterruptedException e) {
248
249 }
250 return task;
251 } else {
252 long delayNanos = scheduledTask.delayNanos();
253 Runnable task = null;
254 if (delayNanos > 0) {
255 try {
256 task = taskQueue.poll(delayNanos, TimeUnit.NANOSECONDS);
257 } catch (InterruptedException e) {
258
259 return null;
260 }
261 }
262 if (task == null) {
263
264
265
266
267 fetchFromScheduledTaskQueue();
268 task = taskQueue.poll();
269 }
270
271 if (task != null) {
272 if (task == WAKEUP_TASK) {
273 return null;
274 }
275 return task;
276 }
277 }
278 }
279 }
280
281 private boolean fetchFromScheduledTaskQueue() {
282 if (scheduledTaskQueue == null || scheduledTaskQueue.isEmpty()) {
283 return true;
284 }
285 long nanoTime = getCurrentTimeNanos();
286 for (;;) {
287 Runnable scheduledTask = pollScheduledTask(nanoTime);
288 if (scheduledTask == null) {
289 return true;
290 }
291 if (!taskQueue.offer(scheduledTask)) {
292
293 scheduledTaskQueue.add((ScheduledFutureTask<?>) scheduledTask);
294 return false;
295 }
296 }
297 }
298
299
300
301
302 private boolean executeExpiredScheduledTasks() {
303 if (scheduledTaskQueue == null || scheduledTaskQueue.isEmpty()) {
304 return false;
305 }
306 long nanoTime = getCurrentTimeNanos();
307 Runnable scheduledTask = pollScheduledTask(nanoTime);
308 if (scheduledTask == null) {
309 return false;
310 }
311 do {
312 safeExecute(scheduledTask);
313 } while ((scheduledTask = pollScheduledTask(nanoTime)) != null);
314 return true;
315 }
316
317
318
319
320 protected Runnable peekTask() {
321 assert inEventLoop();
322 return taskQueue.peek();
323 }
324
325
326
327
328 protected boolean hasTasks() {
329 assert inEventLoop();
330 return !taskQueue.isEmpty();
331 }
332
333
334
335
336 public int pendingTasks() {
337 return taskQueue.size();
338 }
339
340
341
342
343
344 protected void addTask(Runnable task) {
345 ObjectUtil.checkNotNull(task, "task");
346 if (!offerTask(task)) {
347 reject(task);
348 }
349 }
350
351 final boolean offerTask(Runnable task) {
352 if (isShutdown()) {
353 reject();
354 }
355 return taskQueue.offer(task);
356 }
357
358
359
360
361 protected boolean removeTask(Runnable task) {
362 return taskQueue.remove(ObjectUtil.checkNotNull(task, "task"));
363 }
364
365
366
367
368
369
370 protected boolean runAllTasks() {
371 assert inEventLoop();
372 boolean fetchedAll;
373 boolean ranAtLeastOne = false;
374
375 do {
376 fetchedAll = fetchFromScheduledTaskQueue();
377 if (runAllTasksFrom(taskQueue)) {
378 ranAtLeastOne = true;
379 }
380 } while (!fetchedAll);
381
382 if (ranAtLeastOne) {
383 lastExecutionTime = getCurrentTimeNanos();
384 }
385 afterRunningAllTasks();
386 return ranAtLeastOne;
387 }
388
389
390
391
392
393
394
395
396
397 protected final boolean runScheduledAndExecutorTasks(final int maxDrainAttempts) {
398 assert inEventLoop();
399 boolean ranAtLeastOneTask;
400 int drainAttempt = 0;
401 do {
402
403
404 ranAtLeastOneTask = runExistingTasksFrom(taskQueue) | executeExpiredScheduledTasks();
405 } while (ranAtLeastOneTask && ++drainAttempt < maxDrainAttempts);
406
407 if (drainAttempt > 0) {
408 lastExecutionTime = getCurrentTimeNanos();
409 }
410 afterRunningAllTasks();
411
412 return drainAttempt > 0;
413 }
414
415
416
417
418
419
420
421
422 protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) {
423 Runnable task = pollTaskFrom(taskQueue);
424 if (task == null) {
425 return false;
426 }
427 for (;;) {
428 safeExecute(task);
429 task = pollTaskFrom(taskQueue);
430 if (task == null) {
431 return true;
432 }
433 }
434 }
435
436
437
438
439
440
441 private boolean runExistingTasksFrom(Queue<Runnable> taskQueue) {
442 Runnable task = pollTaskFrom(taskQueue);
443 if (task == null) {
444 return false;
445 }
446 int remaining = Math.min(maxPendingTasks, taskQueue.size());
447 safeExecute(task);
448
449
450 while (remaining-- > 0 && (task = taskQueue.poll()) != null) {
451 safeExecute(task);
452 }
453 return true;
454 }
455
456
457
458
459
460 protected boolean runAllTasks(long timeoutNanos) {
461 fetchFromScheduledTaskQueue();
462 Runnable task = pollTask();
463 if (task == null) {
464 afterRunningAllTasks();
465 return false;
466 }
467
468 final long deadline = timeoutNanos > 0 ? getCurrentTimeNanos() + timeoutNanos : 0;
469 long runTasks = 0;
470 long lastExecutionTime;
471 for (;;) {
472 safeExecute(task);
473
474 runTasks ++;
475
476
477
478 if ((runTasks & 0x3F) == 0) {
479 lastExecutionTime = getCurrentTimeNanos();
480 if (lastExecutionTime >= deadline) {
481 break;
482 }
483 }
484
485 task = pollTask();
486 if (task == null) {
487 lastExecutionTime = getCurrentTimeNanos();
488 break;
489 }
490 }
491
492 afterRunningAllTasks();
493 this.lastExecutionTime = lastExecutionTime;
494 return true;
495 }
496
497
498
499
500 protected void afterRunningAllTasks() { }
501
502
503
504
505 protected long delayNanos(long currentTimeNanos) {
506 currentTimeNanos -= initialNanoTime();
507
508 ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
509 if (scheduledTask == null) {
510 return SCHEDULE_PURGE_INTERVAL;
511 }
512
513 return scheduledTask.delayNanos(currentTimeNanos);
514 }
515
516
517
518
519
520 protected long deadlineNanos() {
521 ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
522 if (scheduledTask == null) {
523 return getCurrentTimeNanos() + SCHEDULE_PURGE_INTERVAL;
524 }
525 return scheduledTask.deadlineNanos();
526 }
527
528
529
530
531
532
533
534
535 protected void updateLastExecutionTime() {
536 lastExecutionTime = getCurrentTimeNanos();
537 }
538
539
540
541
542 protected abstract void run();
543
544
545
546
547 protected void cleanup() {
548
549 }
550
551 protected void wakeup(boolean inEventLoop) {
552 if (!inEventLoop) {
553
554
555 taskQueue.offer(WAKEUP_TASK);
556 }
557 }
558
559 @Override
560 public boolean inEventLoop(Thread thread) {
561 return thread == this.thread;
562 }
563
564
565
566
567 public void addShutdownHook(final Runnable task) {
568 if (inEventLoop()) {
569 shutdownHooks.add(task);
570 } else {
571 execute(new Runnable() {
572 @Override
573 public void run() {
574 shutdownHooks.add(task);
575 }
576 });
577 }
578 }
579
580
581
582
583 public void removeShutdownHook(final Runnable task) {
584 if (inEventLoop()) {
585 shutdownHooks.remove(task);
586 } else {
587 execute(new Runnable() {
588 @Override
589 public void run() {
590 shutdownHooks.remove(task);
591 }
592 });
593 }
594 }
595
596 private boolean runShutdownHooks() {
597 boolean ran = false;
598
599 while (!shutdownHooks.isEmpty()) {
600 List<Runnable> copy = new ArrayList<Runnable>(shutdownHooks);
601 shutdownHooks.clear();
602 for (Runnable task: copy) {
603 try {
604 runTask(task);
605 } catch (Throwable t) {
606 logger.warn("Shutdown hook raised an exception.", t);
607 } finally {
608 ran = true;
609 }
610 }
611 }
612
613 if (ran) {
614 lastExecutionTime = getCurrentTimeNanos();
615 }
616
617 return ran;
618 }
619
620 @Override
621 public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
622 ObjectUtil.checkPositiveOrZero(quietPeriod, "quietPeriod");
623 if (timeout < quietPeriod) {
624 throw new IllegalArgumentException(
625 "timeout: " + timeout + " (expected >= quietPeriod (" + quietPeriod + "))");
626 }
627 ObjectUtil.checkNotNull(unit, "unit");
628
629 if (isShuttingDown()) {
630 return terminationFuture();
631 }
632
633 boolean inEventLoop = inEventLoop();
634 boolean wakeup;
635 int oldState;
636 for (;;) {
637 if (isShuttingDown()) {
638 return terminationFuture();
639 }
640 int newState;
641 wakeup = true;
642 oldState = state;
643 if (inEventLoop) {
644 newState = ST_SHUTTING_DOWN;
645 } else {
646 switch (oldState) {
647 case ST_NOT_STARTED:
648 case ST_STARTED:
649 newState = ST_SHUTTING_DOWN;
650 break;
651 default:
652 newState = oldState;
653 wakeup = false;
654 }
655 }
656 if (STATE_UPDATER.compareAndSet(this, oldState, newState)) {
657 break;
658 }
659 }
660 gracefulShutdownQuietPeriod = unit.toNanos(quietPeriod);
661 gracefulShutdownTimeout = unit.toNanos(timeout);
662
663 if (ensureThreadStarted(oldState)) {
664 return terminationFuture;
665 }
666
667 if (wakeup) {
668 taskQueue.offer(WAKEUP_TASK);
669 if (!addTaskWakesUp) {
670 wakeup(inEventLoop);
671 }
672 }
673
674 return terminationFuture();
675 }
676
677 @Override
678 public Future<?> terminationFuture() {
679 return terminationFuture;
680 }
681
682 @Override
683 @Deprecated
684 public void shutdown() {
685 if (isShutdown()) {
686 return;
687 }
688
689 boolean inEventLoop = inEventLoop();
690 boolean wakeup;
691 int oldState;
692 for (;;) {
693 if (isShuttingDown()) {
694 return;
695 }
696 int newState;
697 wakeup = true;
698 oldState = state;
699 if (inEventLoop) {
700 newState = ST_SHUTDOWN;
701 } else {
702 switch (oldState) {
703 case ST_NOT_STARTED:
704 case ST_STARTED:
705 case ST_SHUTTING_DOWN:
706 newState = ST_SHUTDOWN;
707 break;
708 default:
709 newState = oldState;
710 wakeup = false;
711 }
712 }
713 if (STATE_UPDATER.compareAndSet(this, oldState, newState)) {
714 break;
715 }
716 }
717
718 if (ensureThreadStarted(oldState)) {
719 return;
720 }
721
722 if (wakeup) {
723 taskQueue.offer(WAKEUP_TASK);
724 if (!addTaskWakesUp) {
725 wakeup(inEventLoop);
726 }
727 }
728 }
729
730 @Override
731 public boolean isShuttingDown() {
732 return state >= ST_SHUTTING_DOWN;
733 }
734
735 @Override
736 public boolean isShutdown() {
737 return state >= ST_SHUTDOWN;
738 }
739
740 @Override
741 public boolean isTerminated() {
742 return state == ST_TERMINATED;
743 }
744
745
746
747
748 protected boolean confirmShutdown() {
749 if (!isShuttingDown()) {
750 return false;
751 }
752
753 if (!inEventLoop()) {
754 throw new IllegalStateException("must be invoked from an event loop");
755 }
756
757 cancelScheduledTasks();
758
759 if (gracefulShutdownStartTime == 0) {
760 gracefulShutdownStartTime = getCurrentTimeNanos();
761 }
762
763 if (runAllTasks() || runShutdownHooks()) {
764 if (isShutdown()) {
765
766 return true;
767 }
768
769
770
771
772 if (gracefulShutdownQuietPeriod == 0) {
773 return true;
774 }
775 taskQueue.offer(WAKEUP_TASK);
776 return false;
777 }
778
779 final long nanoTime = getCurrentTimeNanos();
780
781 if (isShutdown() || nanoTime - gracefulShutdownStartTime > gracefulShutdownTimeout) {
782 return true;
783 }
784
785 if (nanoTime - lastExecutionTime <= gracefulShutdownQuietPeriod) {
786
787
788 taskQueue.offer(WAKEUP_TASK);
789 try {
790 Thread.sleep(100);
791 } catch (InterruptedException e) {
792
793 }
794
795 return false;
796 }
797
798
799
800 return true;
801 }
802
803 @Override
804 public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
805 ObjectUtil.checkNotNull(unit, "unit");
806 if (inEventLoop()) {
807 throw new IllegalStateException("cannot await termination of the current thread");
808 }
809
810 threadLock.await(timeout, unit);
811
812 return isTerminated();
813 }
814
815 @Override
816 public void execute(Runnable task) {
817 execute0(task);
818 }
819
820 @Override
821 public void lazyExecute(Runnable task) {
822 lazyExecute0(task);
823 }
824
825 private void execute0(@Schedule Runnable task) {
826 ObjectUtil.checkNotNull(task, "task");
827 execute(task, wakesUpForTask(task));
828 }
829
830 private void lazyExecute0(@Schedule Runnable task) {
831 execute(ObjectUtil.checkNotNull(task, "task"), false);
832 }
833
834 private void execute(Runnable task, boolean immediate) {
835 boolean inEventLoop = inEventLoop();
836 addTask(task);
837 if (!inEventLoop) {
838 startThread();
839 if (isShutdown()) {
840 boolean reject = false;
841 try {
842 if (removeTask(task)) {
843 reject = true;
844 }
845 } catch (UnsupportedOperationException e) {
846
847
848
849 }
850 if (reject) {
851 reject();
852 }
853 }
854 }
855
856 if (!addTaskWakesUp && immediate) {
857 wakeup(inEventLoop);
858 }
859 }
860
861 @Override
862 public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
863 throwIfInEventLoop("invokeAny");
864 return super.invokeAny(tasks);
865 }
866
867 @Override
868 public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
869 throws InterruptedException, ExecutionException, TimeoutException {
870 throwIfInEventLoop("invokeAny");
871 return super.invokeAny(tasks, timeout, unit);
872 }
873
874 @Override
875 public <T> List<java.util.concurrent.Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
876 throws InterruptedException {
877 throwIfInEventLoop("invokeAll");
878 return super.invokeAll(tasks);
879 }
880
881 @Override
882 public <T> List<java.util.concurrent.Future<T>> invokeAll(
883 Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
884 throwIfInEventLoop("invokeAll");
885 return super.invokeAll(tasks, timeout, unit);
886 }
887
888 private void throwIfInEventLoop(String method) {
889 if (inEventLoop()) {
890 throw new RejectedExecutionException("Calling " + method + " from within the EventLoop is not allowed");
891 }
892 }
893
894
895
896
897
898
899 public final ThreadProperties threadProperties() {
900 ThreadProperties threadProperties = this.threadProperties;
901 if (threadProperties == null) {
902 Thread thread = this.thread;
903 if (thread == null) {
904 assert !inEventLoop();
905 submit(NOOP_TASK).syncUninterruptibly();
906 thread = this.thread;
907 assert thread != null;
908 }
909
910 threadProperties = new DefaultThreadProperties(thread);
911 if (!PROPERTIES_UPDATER.compareAndSet(this, null, threadProperties)) {
912 threadProperties = this.threadProperties;
913 }
914 }
915
916 return threadProperties;
917 }
918
919
920
921
922 @Deprecated
923 protected interface NonWakeupRunnable extends LazyRunnable { }
924
925
926
927
928
929 protected boolean wakesUpForTask(Runnable task) {
930 return true;
931 }
932
933 protected static void reject() {
934 throw new RejectedExecutionException("event executor terminated");
935 }
936
937
938
939
940
941
942 protected final void reject(Runnable task) {
943 rejectedExecutionHandler.rejected(task, this);
944 }
945
946
947
948 private static final long SCHEDULE_PURGE_INTERVAL = TimeUnit.SECONDS.toNanos(1);
949
950 private void startThread() {
951 if (state == ST_NOT_STARTED) {
952 if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
953 boolean success = false;
954 try {
955 doStartThread();
956 success = true;
957 } finally {
958 if (!success) {
959 STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
960 }
961 }
962 }
963 }
964 }
965
966 private boolean ensureThreadStarted(int oldState) {
967 if (oldState == ST_NOT_STARTED) {
968 try {
969 doStartThread();
970 } catch (Throwable cause) {
971 STATE_UPDATER.set(this, ST_TERMINATED);
972 terminationFuture.tryFailure(cause);
973
974 if (!(cause instanceof Exception)) {
975
976 PlatformDependent.throwException(cause);
977 }
978 return true;
979 }
980 }
981 return false;
982 }
983
984 private void doStartThread() {
985 assert thread == null;
986 executor.execute(new Runnable() {
987 @Override
988 public void run() {
989 thread = Thread.currentThread();
990 if (interrupted) {
991 thread.interrupt();
992 }
993
994 boolean success = false;
995 updateLastExecutionTime();
996 try {
997 SingleThreadEventExecutor.this.run();
998 success = true;
999 } catch (Throwable t) {
1000 logger.warn("Unexpected exception from an event executor: ", t);
1001 } finally {
1002 for (;;) {
1003 int oldState = state;
1004 if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
1005 SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
1006 break;
1007 }
1008 }
1009
1010
1011 if (success && gracefulShutdownStartTime == 0) {
1012 if (logger.isErrorEnabled()) {
1013 logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
1014 SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must " +
1015 "be called before run() implementation terminates.");
1016 }
1017 }
1018
1019 try {
1020
1021
1022
1023 for (;;) {
1024 if (confirmShutdown()) {
1025 break;
1026 }
1027 }
1028
1029
1030
1031 for (;;) {
1032 int oldState = state;
1033 if (oldState >= ST_SHUTDOWN || STATE_UPDATER.compareAndSet(
1034 SingleThreadEventExecutor.this, oldState, ST_SHUTDOWN)) {
1035 break;
1036 }
1037 }
1038
1039
1040
1041 confirmShutdown();
1042 } finally {
1043 try {
1044 cleanup();
1045 } finally {
1046
1047
1048
1049
1050 FastThreadLocal.removeAll();
1051
1052 STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
1053 threadLock.countDown();
1054 int numUserTasks = drainTasks();
1055 if (numUserTasks > 0 && logger.isWarnEnabled()) {
1056 logger.warn("An event executor terminated with " +
1057 "non-empty task queue (" + numUserTasks + ')');
1058 }
1059 terminationFuture.setSuccess(null);
1060 }
1061 }
1062 }
1063 }
1064 });
1065 }
1066
1067 final int drainTasks() {
1068 int numTasks = 0;
1069 for (;;) {
1070 Runnable runnable = taskQueue.poll();
1071 if (runnable == null) {
1072 break;
1073 }
1074
1075
1076 if (WAKEUP_TASK != runnable) {
1077 numTasks++;
1078 }
1079 }
1080 return numTasks;
1081 }
1082
1083 private static final class DefaultThreadProperties implements ThreadProperties {
1084 private final Thread t;
1085
1086 DefaultThreadProperties(Thread t) {
1087 this.t = t;
1088 }
1089
1090 @Override
1091 public State state() {
1092 return t.getState();
1093 }
1094
1095 @Override
1096 public int priority() {
1097 return t.getPriority();
1098 }
1099
1100 @Override
1101 public boolean isInterrupted() {
1102 return t.isInterrupted();
1103 }
1104
1105 @Override
1106 public boolean isDaemon() {
1107 return t.isDaemon();
1108 }
1109
1110 @Override
1111 public String name() {
1112 return t.getName();
1113 }
1114
1115 @Override
1116 public long id() {
1117 return t.getId();
1118 }
1119
1120 @Override
1121 public StackTraceElement[] stackTrace() {
1122 return t.getStackTrace();
1123 }
1124
1125 @Override
1126 public boolean isAlive() {
1127 return t.isAlive();
1128 }
1129 }
1130 }