1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.util.concurrent;
17
18 import io.netty.util.internal.ObjectUtil;
19 import io.netty.util.internal.PlatformDependent;
20 import io.netty.util.internal.SystemPropertyUtil;
21 import io.netty.util.internal.ThreadExecutorMap;
22 import io.netty.util.internal.logging.InternalLogger;
23 import io.netty.util.internal.logging.InternalLoggerFactory;
24 import org.jetbrains.annotations.Async.Schedule;
25
26 import java.lang.Thread.State;
27 import java.util.ArrayList;
28 import java.util.Collection;
29 import java.util.LinkedHashSet;
30 import java.util.List;
31 import java.util.Queue;
32 import java.util.Set;
33 import java.util.concurrent.BlockingQueue;
34 import java.util.concurrent.Callable;
35 import java.util.concurrent.CountDownLatch;
36 import java.util.concurrent.ExecutionException;
37 import java.util.concurrent.Executor;
38 import java.util.concurrent.LinkedBlockingQueue;
39 import java.util.concurrent.RejectedExecutionException;
40 import java.util.concurrent.ThreadFactory;
41 import java.util.concurrent.TimeUnit;
42 import java.util.concurrent.TimeoutException;
43 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
44 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
45 import java.util.concurrent.locks.Lock;
46 import java.util.concurrent.locks.ReentrantLock;
47
48
49
50
51
52 public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
53
54 static final int DEFAULT_MAX_PENDING_EXECUTOR_TASKS = Math.max(16,
55 SystemPropertyUtil.getInt("io.netty.eventexecutor.maxPendingTasks", Integer.MAX_VALUE));
56
57 private static final InternalLogger logger =
58 InternalLoggerFactory.getInstance(SingleThreadEventExecutor.class);
59
60 private static final int ST_NOT_STARTED = 1;
61 private static final int ST_SUSPENDING = 2;
62 private static final int ST_SUSPENDED = 3;
63 private static final int ST_STARTED = 4;
64 private static final int ST_SHUTTING_DOWN = 5;
65 private static final int ST_SHUTDOWN = 6;
66 private static final int ST_TERMINATED = 7;
67
68 private static final Runnable NOOP_TASK = new Runnable() {
69 @Override
70 public void run() {
71
72 }
73 };
74
75 private static final AtomicIntegerFieldUpdater<SingleThreadEventExecutor> STATE_UPDATER =
76 AtomicIntegerFieldUpdater.newUpdater(SingleThreadEventExecutor.class, "state");
77 private static final AtomicReferenceFieldUpdater<SingleThreadEventExecutor, ThreadProperties> PROPERTIES_UPDATER =
78 AtomicReferenceFieldUpdater.newUpdater(
79 SingleThreadEventExecutor.class, ThreadProperties.class, "threadProperties");
80 private final Queue<Runnable> taskQueue;
81
82 private volatile Thread thread;
83 @SuppressWarnings("unused")
84 private volatile ThreadProperties threadProperties;
85 private final Executor executor;
86 private volatile boolean interrupted;
87
88 private final Lock processingLock = new ReentrantLock();
89 private final CountDownLatch threadLock = new CountDownLatch(1);
90 private final Set<Runnable> shutdownHooks = new LinkedHashSet<Runnable>();
91 private final boolean addTaskWakesUp;
92 private final int maxPendingTasks;
93 private final RejectedExecutionHandler rejectedExecutionHandler;
94 private final boolean supportSuspension;
95
96 private long lastExecutionTime;
97
98 @SuppressWarnings({ "FieldMayBeFinal", "unused" })
99 private volatile int state = ST_NOT_STARTED;
100
101 private volatile long gracefulShutdownQuietPeriod;
102 private volatile long gracefulShutdownTimeout;
103 private long gracefulShutdownStartTime;
104
105 private final Promise<?> terminationFuture = new DefaultPromise<Void>(GlobalEventExecutor.INSTANCE);
106
107
108
109
110
111
112
113
114
115 protected SingleThreadEventExecutor(
116 EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
117 this(parent, new ThreadPerTaskExecutor(threadFactory), addTaskWakesUp);
118 }
119
120
121
122
123
124
125
126
127
128
129
130 protected SingleThreadEventExecutor(
131 EventExecutorGroup parent, ThreadFactory threadFactory,
132 boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedHandler) {
133 this(parent, new ThreadPerTaskExecutor(threadFactory), addTaskWakesUp, maxPendingTasks, rejectedHandler);
134 }
135
136
137
138
139
140
141
142
143
144
145
146
147 protected SingleThreadEventExecutor(
148 EventExecutorGroup parent, ThreadFactory threadFactory,
149 boolean addTaskWakesUp, boolean supportSuspension,
150 int maxPendingTasks, RejectedExecutionHandler rejectedHandler) {
151 this(parent, new ThreadPerTaskExecutor(threadFactory), addTaskWakesUp, supportSuspension,
152 maxPendingTasks, rejectedHandler);
153 }
154
155
156
157
158
159
160
161
162
163 protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp) {
164 this(parent, executor, addTaskWakesUp, DEFAULT_MAX_PENDING_EXECUTOR_TASKS, RejectedExecutionHandlers.reject());
165 }
166
167
168
169
170
171
172
173
174
175
176
177 protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
178 boolean addTaskWakesUp, int maxPendingTasks,
179 RejectedExecutionHandler rejectedHandler) {
180 this(parent, executor, addTaskWakesUp, false, maxPendingTasks, rejectedHandler);
181 }
182
183
184
185
186
187
188
189
190
191
192
193
194 protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
195 boolean addTaskWakesUp, boolean supportSuspension,
196 int maxPendingTasks, RejectedExecutionHandler rejectedHandler) {
197 super(parent);
198 this.addTaskWakesUp = addTaskWakesUp;
199 this.supportSuspension = supportSuspension;
200 this.maxPendingTasks = Math.max(16, maxPendingTasks);
201 this.executor = ThreadExecutorMap.apply(executor, this);
202 taskQueue = newTaskQueue(this.maxPendingTasks);
203 rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
204 }
205
206 protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
207 boolean addTaskWakesUp, Queue<Runnable> taskQueue,
208 RejectedExecutionHandler rejectedHandler) {
209 this(parent, executor, addTaskWakesUp, false, taskQueue, rejectedHandler);
210 }
211
212 protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
213 boolean addTaskWakesUp, boolean supportSuspension,
214 Queue<Runnable> taskQueue, RejectedExecutionHandler rejectedHandler) {
215 super(parent);
216 this.addTaskWakesUp = addTaskWakesUp;
217 this.supportSuspension = supportSuspension;
218 this.maxPendingTasks = DEFAULT_MAX_PENDING_EXECUTOR_TASKS;
219 this.executor = ThreadExecutorMap.apply(executor, this);
220 this.taskQueue = ObjectUtil.checkNotNull(taskQueue, "taskQueue");
221 this.rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
222 }
223
224
225
226
227 @Deprecated
228 protected Queue<Runnable> newTaskQueue() {
229 return newTaskQueue(maxPendingTasks);
230 }
231
232
233
234
235
236
237
238 protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
239 return new LinkedBlockingQueue<Runnable>(maxPendingTasks);
240 }
241
242
243
244
245 protected void interruptThread() {
246 Thread currentThread = thread;
247 if (currentThread == null) {
248 interrupted = true;
249 } else {
250 currentThread.interrupt();
251 }
252 }
253
254
255
256
257 protected Runnable pollTask() {
258 assert inEventLoop();
259 return pollTaskFrom(taskQueue);
260 }
261
262 protected static Runnable pollTaskFrom(Queue<Runnable> taskQueue) {
263 for (;;) {
264 Runnable task = taskQueue.poll();
265 if (task != WAKEUP_TASK) {
266 return task;
267 }
268 }
269 }
270
271
272
273
274
275
276
277
278
279
280 protected Runnable takeTask() {
281 assert inEventLoop();
282 if (!(taskQueue instanceof BlockingQueue)) {
283 throw new UnsupportedOperationException();
284 }
285
286 BlockingQueue<Runnable> taskQueue = (BlockingQueue<Runnable>) this.taskQueue;
287 for (;;) {
288 ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
289 if (scheduledTask == null) {
290 Runnable task = null;
291 try {
292 task = taskQueue.take();
293 if (task == WAKEUP_TASK) {
294 task = null;
295 }
296 } catch (InterruptedException e) {
297
298 }
299 return task;
300 } else {
301 long delayNanos = scheduledTask.delayNanos();
302 Runnable task = null;
303 if (delayNanos > 0) {
304 try {
305 task = taskQueue.poll(delayNanos, TimeUnit.NANOSECONDS);
306 } catch (InterruptedException e) {
307
308 return null;
309 }
310 }
311 if (task == null) {
312
313
314
315
316 fetchFromScheduledTaskQueue();
317 task = taskQueue.poll();
318 }
319
320 if (task != null) {
321 if (task == WAKEUP_TASK) {
322 return null;
323 }
324 return task;
325 }
326 }
327 }
328 }
329
330 private boolean fetchFromScheduledTaskQueue() {
331 return fetchFromScheduledTaskQueue(taskQueue);
332 }
333
334
335
336
337 private boolean executeExpiredScheduledTasks() {
338 if (scheduledTaskQueue == null || scheduledTaskQueue.isEmpty()) {
339 return false;
340 }
341 long nanoTime = getCurrentTimeNanos();
342 Runnable scheduledTask = pollScheduledTask(nanoTime);
343 if (scheduledTask == null) {
344 return false;
345 }
346 do {
347 safeExecute(scheduledTask);
348 } while ((scheduledTask = pollScheduledTask(nanoTime)) != null);
349 return true;
350 }
351
352
353
354
355 protected Runnable peekTask() {
356 assert inEventLoop();
357 return taskQueue.peek();
358 }
359
360
361
362
363 protected boolean hasTasks() {
364 assert inEventLoop();
365 return !taskQueue.isEmpty();
366 }
367
368
369
370
371 public int pendingTasks() {
372 return taskQueue.size();
373 }
374
375
376
377
378
379 protected void addTask(Runnable task) {
380 ObjectUtil.checkNotNull(task, "task");
381 if (!offerTask(task)) {
382 reject(task);
383 }
384 }
385
386 final boolean offerTask(Runnable task) {
387 if (isShutdown()) {
388 reject();
389 }
390 return taskQueue.offer(task);
391 }
392
393
394
395
396 protected boolean removeTask(Runnable task) {
397 return taskQueue.remove(ObjectUtil.checkNotNull(task, "task"));
398 }
399
400
401
402
403
404
405 protected boolean runAllTasks() {
406 assert inEventLoop();
407 boolean fetchedAll;
408 boolean ranAtLeastOne = false;
409
410 do {
411 fetchedAll = fetchFromScheduledTaskQueue(taskQueue);
412 if (runAllTasksFrom(taskQueue)) {
413 ranAtLeastOne = true;
414 }
415 } while (!fetchedAll);
416
417 if (ranAtLeastOne) {
418 lastExecutionTime = getCurrentTimeNanos();
419 }
420 afterRunningAllTasks();
421 return ranAtLeastOne;
422 }
423
424
425
426
427
428
429
430
431
432 protected final boolean runScheduledAndExecutorTasks(final int maxDrainAttempts) {
433 assert inEventLoop();
434 boolean ranAtLeastOneTask;
435 int drainAttempt = 0;
436 do {
437
438
439 ranAtLeastOneTask = runExistingTasksFrom(taskQueue) | executeExpiredScheduledTasks();
440 } while (ranAtLeastOneTask && ++drainAttempt < maxDrainAttempts);
441
442 if (drainAttempt > 0) {
443 lastExecutionTime = getCurrentTimeNanos();
444 }
445 afterRunningAllTasks();
446
447 return drainAttempt > 0;
448 }
449
450
451
452
453
454
455
456
457 protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) {
458 Runnable task = pollTaskFrom(taskQueue);
459 if (task == null) {
460 return false;
461 }
462 for (;;) {
463 safeExecute(task);
464 task = pollTaskFrom(taskQueue);
465 if (task == null) {
466 return true;
467 }
468 }
469 }
470
471
472
473
474
475
476 private boolean runExistingTasksFrom(Queue<Runnable> taskQueue) {
477 Runnable task = pollTaskFrom(taskQueue);
478 if (task == null) {
479 return false;
480 }
481 int remaining = Math.min(maxPendingTasks, taskQueue.size());
482 safeExecute(task);
483
484
485 while (remaining-- > 0 && (task = taskQueue.poll()) != null) {
486 safeExecute(task);
487 }
488 return true;
489 }
490
491
492
493
494
495 protected boolean runAllTasks(long timeoutNanos) {
496 fetchFromScheduledTaskQueue(taskQueue);
497 Runnable task = pollTask();
498 if (task == null) {
499 afterRunningAllTasks();
500 return false;
501 }
502
503 final long deadline = timeoutNanos > 0 ? getCurrentTimeNanos() + timeoutNanos : 0;
504 long runTasks = 0;
505 long lastExecutionTime;
506 for (;;) {
507 safeExecute(task);
508
509 runTasks ++;
510
511
512
513 if ((runTasks & 0x3F) == 0) {
514 lastExecutionTime = getCurrentTimeNanos();
515 if (lastExecutionTime >= deadline) {
516 break;
517 }
518 }
519
520 task = pollTask();
521 if (task == null) {
522 lastExecutionTime = getCurrentTimeNanos();
523 break;
524 }
525 }
526
527 afterRunningAllTasks();
528 this.lastExecutionTime = lastExecutionTime;
529 return true;
530 }
531
532
533
534
535 protected void afterRunningAllTasks() { }
536
537
538
539
540 protected long delayNanos(long currentTimeNanos) {
541 currentTimeNanos -= initialNanoTime();
542
543 ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
544 if (scheduledTask == null) {
545 return SCHEDULE_PURGE_INTERVAL;
546 }
547
548 return scheduledTask.delayNanos(currentTimeNanos);
549 }
550
551
552
553
554
555 protected long deadlineNanos() {
556 ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
557 if (scheduledTask == null) {
558 return getCurrentTimeNanos() + SCHEDULE_PURGE_INTERVAL;
559 }
560 return scheduledTask.deadlineNanos();
561 }
562
563
564
565
566
567
568
569
570 protected void updateLastExecutionTime() {
571 lastExecutionTime = getCurrentTimeNanos();
572 }
573
574
575
576
577 protected abstract void run();
578
579
580
581
582 protected void cleanup() {
583
584 }
585
586 protected void wakeup(boolean inEventLoop) {
587 if (!inEventLoop) {
588
589
590 taskQueue.offer(WAKEUP_TASK);
591 }
592 }
593
594 @Override
595 public boolean inEventLoop(Thread thread) {
596 return thread == this.thread;
597 }
598
599
600
601
602 public void addShutdownHook(final Runnable task) {
603 if (inEventLoop()) {
604 shutdownHooks.add(task);
605 } else {
606 execute(new Runnable() {
607 @Override
608 public void run() {
609 shutdownHooks.add(task);
610 }
611 });
612 }
613 }
614
615
616
617
618 public void removeShutdownHook(final Runnable task) {
619 if (inEventLoop()) {
620 shutdownHooks.remove(task);
621 } else {
622 execute(new Runnable() {
623 @Override
624 public void run() {
625 shutdownHooks.remove(task);
626 }
627 });
628 }
629 }
630
631 private boolean runShutdownHooks() {
632 boolean ran = false;
633
634 while (!shutdownHooks.isEmpty()) {
635 List<Runnable> copy = new ArrayList<Runnable>(shutdownHooks);
636 shutdownHooks.clear();
637 for (Runnable task: copy) {
638 try {
639 runTask(task);
640 } catch (Throwable t) {
641 logger.warn("Shutdown hook raised an exception.", t);
642 } finally {
643 ran = true;
644 }
645 }
646 }
647
648 if (ran) {
649 lastExecutionTime = getCurrentTimeNanos();
650 }
651
652 return ran;
653 }
654
655 private void shutdown0(long quietPeriod, long timeout, int shutdownState) {
656 if (isShuttingDown()) {
657 return;
658 }
659
660 boolean inEventLoop = inEventLoop();
661 boolean wakeup;
662 int oldState;
663 for (;;) {
664 if (isShuttingDown()) {
665 return;
666 }
667 int newState;
668 wakeup = true;
669 oldState = state;
670 if (inEventLoop) {
671 newState = shutdownState;
672 } else {
673 switch (oldState) {
674 case ST_NOT_STARTED:
675 case ST_STARTED:
676 case ST_SUSPENDING:
677 case ST_SUSPENDED:
678 newState = shutdownState;
679 break;
680 default:
681 newState = oldState;
682 wakeup = false;
683 }
684 }
685 if (STATE_UPDATER.compareAndSet(this, oldState, newState)) {
686 break;
687 }
688 }
689 if (quietPeriod != -1) {
690 gracefulShutdownQuietPeriod = quietPeriod;
691 }
692 if (timeout != -1) {
693 gracefulShutdownTimeout = timeout;
694 }
695
696 if (ensureThreadStarted(oldState)) {
697 return;
698 }
699
700 if (wakeup) {
701 taskQueue.offer(WAKEUP_TASK);
702 if (!addTaskWakesUp) {
703 wakeup(inEventLoop);
704 }
705 }
706 }
707
708 @Override
709 public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
710 ObjectUtil.checkPositiveOrZero(quietPeriod, "quietPeriod");
711 if (timeout < quietPeriod) {
712 throw new IllegalArgumentException(
713 "timeout: " + timeout + " (expected >= quietPeriod (" + quietPeriod + "))");
714 }
715 ObjectUtil.checkNotNull(unit, "unit");
716
717 shutdown0(unit.toNanos(quietPeriod), unit.toNanos(timeout), ST_SHUTTING_DOWN);
718 return terminationFuture();
719 }
720
721 @Override
722 public Future<?> terminationFuture() {
723 return terminationFuture;
724 }
725
726 @Override
727 @Deprecated
728 public void shutdown() {
729 shutdown0(-1, -1, ST_SHUTDOWN);
730 }
731
732 @Override
733 public boolean isShuttingDown() {
734 return state >= ST_SHUTTING_DOWN;
735 }
736
737 @Override
738 public boolean isShutdown() {
739 return state >= ST_SHUTDOWN;
740 }
741
742 @Override
743 public boolean isTerminated() {
744 return state == ST_TERMINATED;
745 }
746
747 @Override
748 public boolean isSuspended() {
749 int currentState = state;
750 return currentState == ST_SUSPENDED || currentState == ST_SUSPENDING;
751 }
752
753 @Override
754 public boolean trySuspend() {
755 if (supportSuspension) {
756 if (STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_SUSPENDING)) {
757 wakeup(inEventLoop());
758 return true;
759 }
760 int currentState = state;
761 return currentState == ST_SUSPENDED || currentState == ST_SUSPENDING;
762 }
763 return false;
764 }
765
766
767
768
769
770
771
772 protected boolean canSuspend() {
773 return canSuspend(state);
774 }
775
776
777
778
779
780
781
782
783
784
785 protected boolean canSuspend(int state) {
786 assert inEventLoop();
787 return supportSuspension && (state == ST_SUSPENDED || state == ST_SUSPENDING)
788 && !hasTasks() && nextScheduledTaskDeadlineNanos() == -1;
789 }
790
791
792
793
794 protected boolean confirmShutdown() {
795 if (!isShuttingDown()) {
796 return false;
797 }
798
799 if (!inEventLoop()) {
800 throw new IllegalStateException("must be invoked from an event loop");
801 }
802
803 cancelScheduledTasks();
804
805 if (gracefulShutdownStartTime == 0) {
806 gracefulShutdownStartTime = getCurrentTimeNanos();
807 }
808
809 if (runAllTasks() || runShutdownHooks()) {
810 if (isShutdown()) {
811
812 return true;
813 }
814
815
816
817
818 if (gracefulShutdownQuietPeriod == 0) {
819 return true;
820 }
821 taskQueue.offer(WAKEUP_TASK);
822 return false;
823 }
824
825 final long nanoTime = getCurrentTimeNanos();
826
827 if (isShutdown() || nanoTime - gracefulShutdownStartTime > gracefulShutdownTimeout) {
828 return true;
829 }
830
831 if (nanoTime - lastExecutionTime <= gracefulShutdownQuietPeriod) {
832
833
834 taskQueue.offer(WAKEUP_TASK);
835 try {
836 Thread.sleep(100);
837 } catch (InterruptedException e) {
838
839 }
840
841 return false;
842 }
843
844
845
846 return true;
847 }
848
849 @Override
850 public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
851 ObjectUtil.checkNotNull(unit, "unit");
852 if (inEventLoop()) {
853 throw new IllegalStateException("cannot await termination of the current thread");
854 }
855
856 threadLock.await(timeout, unit);
857
858 return isTerminated();
859 }
860
861 @Override
862 public void execute(Runnable task) {
863 execute0(task);
864 }
865
866 @Override
867 public void lazyExecute(Runnable task) {
868 lazyExecute0(task);
869 }
870
871 private void execute0(@Schedule Runnable task) {
872 ObjectUtil.checkNotNull(task, "task");
873 execute(task, wakesUpForTask(task));
874 }
875
876 private void lazyExecute0(@Schedule Runnable task) {
877 execute(ObjectUtil.checkNotNull(task, "task"), false);
878 }
879
880 @Override
881 void scheduleRemoveScheduled(final ScheduledFutureTask<?> task) {
882 ObjectUtil.checkNotNull(task, "task");
883 int currentState = state;
884 if (supportSuspension && currentState == ST_SUSPENDED) {
885
886
887
888 execute(new Runnable() {
889 @Override
890 public void run() {
891 task.run();
892 if (canSuspend(ST_SUSPENDED)) {
893
894
895 trySuspend();
896 }
897 }
898 }, true);
899 } else {
900
901 execute(task, false);
902 }
903 }
904
905 private void execute(Runnable task, boolean immediate) {
906 boolean inEventLoop = inEventLoop();
907 addTask(task);
908 if (!inEventLoop) {
909 startThread();
910 if (isShutdown()) {
911 boolean reject = false;
912 try {
913 if (removeTask(task)) {
914 reject = true;
915 }
916 } catch (UnsupportedOperationException e) {
917
918
919
920 }
921 if (reject) {
922 reject();
923 }
924 }
925 }
926
927 if (!addTaskWakesUp && immediate) {
928 wakeup(inEventLoop);
929 }
930 }
931
932 @Override
933 public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
934 throwIfInEventLoop("invokeAny");
935 return super.invokeAny(tasks);
936 }
937
938 @Override
939 public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
940 throws InterruptedException, ExecutionException, TimeoutException {
941 throwIfInEventLoop("invokeAny");
942 return super.invokeAny(tasks, timeout, unit);
943 }
944
945 @Override
946 public <T> List<java.util.concurrent.Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
947 throws InterruptedException {
948 throwIfInEventLoop("invokeAll");
949 return super.invokeAll(tasks);
950 }
951
952 @Override
953 public <T> List<java.util.concurrent.Future<T>> invokeAll(
954 Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
955 throwIfInEventLoop("invokeAll");
956 return super.invokeAll(tasks, timeout, unit);
957 }
958
959 private void throwIfInEventLoop(String method) {
960 if (inEventLoop()) {
961 throw new RejectedExecutionException("Calling " + method + " from within the EventLoop is not allowed");
962 }
963 }
964
965
966
967
968
969
970 public final ThreadProperties threadProperties() {
971 ThreadProperties threadProperties = this.threadProperties;
972 if (threadProperties == null) {
973 Thread thread = this.thread;
974 if (thread == null) {
975 assert !inEventLoop();
976 submit(NOOP_TASK).syncUninterruptibly();
977 thread = this.thread;
978 assert thread != null;
979 }
980
981 threadProperties = new DefaultThreadProperties(thread);
982 if (!PROPERTIES_UPDATER.compareAndSet(this, null, threadProperties)) {
983 threadProperties = this.threadProperties;
984 }
985 }
986
987 return threadProperties;
988 }
989
990
991
992
993 @Deprecated
994 protected interface NonWakeupRunnable extends LazyRunnable { }
995
996
997
998
999
1000 protected boolean wakesUpForTask(Runnable task) {
1001 return true;
1002 }
1003
1004 protected static void reject() {
1005 throw new RejectedExecutionException("event executor terminated");
1006 }
1007
1008
1009
1010
1011
1012
1013 protected final void reject(Runnable task) {
1014 rejectedExecutionHandler.rejected(task, this);
1015 }
1016
1017
1018
1019 private static final long SCHEDULE_PURGE_INTERVAL = TimeUnit.SECONDS.toNanos(1);
1020
1021 private void startThread() {
1022 int currentState = state;
1023 if (currentState == ST_NOT_STARTED || currentState == ST_SUSPENDED) {
1024 if (STATE_UPDATER.compareAndSet(this, currentState, ST_STARTED)) {
1025 boolean success = false;
1026 try {
1027 doStartThread();
1028 success = true;
1029 } finally {
1030 if (!success) {
1031 STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
1032 }
1033 }
1034 }
1035 }
1036 }
1037
1038 private boolean ensureThreadStarted(int oldState) {
1039 if (oldState == ST_NOT_STARTED || oldState == ST_SUSPENDED) {
1040 try {
1041 doStartThread();
1042 } catch (Throwable cause) {
1043 STATE_UPDATER.set(this, ST_TERMINATED);
1044 terminationFuture.tryFailure(cause);
1045
1046 if (!(cause instanceof Exception)) {
1047
1048 PlatformDependent.throwException(cause);
1049 }
1050 return true;
1051 }
1052 }
1053 return false;
1054 }
1055
1056 private void doStartThread() {
1057 executor.execute(new Runnable() {
1058 @Override
1059 public void run() {
1060 processingLock.lock();
1061 assert thread == null;
1062 thread = Thread.currentThread();
1063 if (interrupted) {
1064 thread.interrupt();
1065 interrupted = false;
1066 }
1067 boolean success = false;
1068 Throwable unexpectedException = null;
1069 updateLastExecutionTime();
1070 boolean suspend = false;
1071 try {
1072 for (;;) {
1073 SingleThreadEventExecutor.this.run();
1074 success = true;
1075
1076 int currentState = state;
1077 if (canSuspend(currentState)) {
1078 if (!STATE_UPDATER.compareAndSet(SingleThreadEventExecutor.this,
1079 ST_SUSPENDING, ST_SUSPENDED)) {
1080
1081 continue;
1082 }
1083
1084 if (!canSuspend(ST_SUSPENDED) && STATE_UPDATER.compareAndSet(SingleThreadEventExecutor.this,
1085 ST_SUSPENDED, ST_STARTED)) {
1086
1087
1088 continue;
1089 }
1090 suspend = true;
1091 }
1092 break;
1093 }
1094 } catch (Throwable t) {
1095 unexpectedException = t;
1096 logger.warn("Unexpected exception from an event executor: ", t);
1097 } finally {
1098 boolean shutdown = !suspend;
1099 if (shutdown) {
1100 for (;;) {
1101
1102 int oldState = state;
1103 if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
1104 SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
1105 break;
1106 }
1107 }
1108 if (success && gracefulShutdownStartTime == 0) {
1109
1110 if (logger.isErrorEnabled()) {
1111 logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
1112 SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must " +
1113 "be called before run() implementation terminates.");
1114 }
1115 }
1116 }
1117
1118 try {
1119 if (shutdown) {
1120
1121
1122
1123 for (;;) {
1124 if (confirmShutdown()) {
1125 break;
1126 }
1127 }
1128
1129
1130
1131 for (;;) {
1132 int currentState = state;
1133 if (currentState >= ST_SHUTDOWN || STATE_UPDATER.compareAndSet(
1134 SingleThreadEventExecutor.this, currentState, ST_SHUTDOWN)) {
1135 break;
1136 }
1137 }
1138
1139
1140
1141 confirmShutdown();
1142 }
1143 } finally {
1144 try {
1145 if (shutdown) {
1146 try {
1147 cleanup();
1148 } finally {
1149
1150
1151
1152
1153 FastThreadLocal.removeAll();
1154
1155 STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
1156 threadLock.countDown();
1157 int numUserTasks = drainTasks();
1158 if (numUserTasks > 0 && logger.isWarnEnabled()) {
1159 logger.warn("An event executor terminated with " +
1160 "non-empty task queue (" + numUserTasks + ')');
1161 }
1162 if (unexpectedException == null) {
1163 terminationFuture.setSuccess(null);
1164 } else {
1165 terminationFuture.setFailure(unexpectedException);
1166 }
1167 }
1168 } else {
1169
1170 FastThreadLocal.removeAll();
1171
1172
1173 threadProperties = null;
1174 }
1175 } finally {
1176 thread = null;
1177
1178 processingLock.unlock();
1179 }
1180 }
1181 }
1182 }
1183 });
1184 }
1185
1186 final int drainTasks() {
1187 int numTasks = 0;
1188 for (;;) {
1189 Runnable runnable = taskQueue.poll();
1190 if (runnable == null) {
1191 break;
1192 }
1193
1194
1195 if (WAKEUP_TASK != runnable) {
1196 numTasks++;
1197 }
1198 }
1199 return numTasks;
1200 }
1201
1202 private static final class DefaultThreadProperties implements ThreadProperties {
1203 private final Thread t;
1204
1205 DefaultThreadProperties(Thread t) {
1206 this.t = t;
1207 }
1208
1209 @Override
1210 public State state() {
1211 return t.getState();
1212 }
1213
1214 @Override
1215 public int priority() {
1216 return t.getPriority();
1217 }
1218
1219 @Override
1220 public boolean isInterrupted() {
1221 return t.isInterrupted();
1222 }
1223
1224 @Override
1225 public boolean isDaemon() {
1226 return t.isDaemon();
1227 }
1228
1229 @Override
1230 public String name() {
1231 return t.getName();
1232 }
1233
1234 @Override
1235 public long id() {
1236 return t.getId();
1237 }
1238
1239 @Override
1240 public StackTraceElement[] stackTrace() {
1241 return t.getStackTrace();
1242 }
1243
1244 @Override
1245 public boolean isAlive() {
1246 return t.isAlive();
1247 }
1248 }
1249 }