1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.util.concurrent;
17
18 import io.netty.util.internal.ObjectUtil;
19 import io.netty.util.internal.PlatformDependent;
20 import io.netty.util.internal.SystemPropertyUtil;
21 import io.netty.util.internal.ThreadExecutorMap;
22 import io.netty.util.internal.logging.InternalLogger;
23 import io.netty.util.internal.logging.InternalLoggerFactory;
24 import org.jetbrains.annotations.Async.Schedule;
25
26 import java.lang.Thread.State;
27 import java.util.ArrayList;
28 import java.util.Collection;
29 import java.util.LinkedHashSet;
30 import java.util.List;
31 import java.util.Queue;
32 import java.util.Set;
33 import java.util.concurrent.BlockingQueue;
34 import java.util.concurrent.Callable;
35 import java.util.concurrent.CountDownLatch;
36 import java.util.concurrent.ExecutionException;
37 import java.util.concurrent.Executor;
38 import java.util.concurrent.LinkedBlockingQueue;
39 import java.util.concurrent.RejectedExecutionException;
40 import java.util.concurrent.ThreadFactory;
41 import java.util.concurrent.TimeUnit;
42 import java.util.concurrent.TimeoutException;
43 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
44 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
45 import java.util.concurrent.locks.Lock;
46 import java.util.concurrent.locks.ReentrantLock;
47
48
49
50
51
52 public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
53
54 static final int DEFAULT_MAX_PENDING_EXECUTOR_TASKS = Math.max(16,
55 SystemPropertyUtil.getInt("io.netty.eventexecutor.maxPendingTasks", Integer.MAX_VALUE));
56
57 private static final InternalLogger logger =
58 InternalLoggerFactory.getInstance(SingleThreadEventExecutor.class);
59
60 private static final int ST_NOT_STARTED = 1;
61 private static final int ST_SUSPENDING = 2;
62 private static final int ST_SUSPENDED = 3;
63 private static final int ST_STARTED = 4;
64 private static final int ST_SHUTTING_DOWN = 5;
65 private static final int ST_SHUTDOWN = 6;
66 private static final int ST_TERMINATED = 7;
67
68 private static final Runnable NOOP_TASK = new Runnable() {
69 @Override
70 public void run() {
71
72 }
73 };
74
75 private static final AtomicIntegerFieldUpdater<SingleThreadEventExecutor> STATE_UPDATER =
76 AtomicIntegerFieldUpdater.newUpdater(SingleThreadEventExecutor.class, "state");
77 private static final AtomicReferenceFieldUpdater<SingleThreadEventExecutor, ThreadProperties> PROPERTIES_UPDATER =
78 AtomicReferenceFieldUpdater.newUpdater(
79 SingleThreadEventExecutor.class, ThreadProperties.class, "threadProperties");
80 private final Queue<Runnable> taskQueue;
81
82 private volatile Thread thread;
83 @SuppressWarnings("unused")
84 private volatile ThreadProperties threadProperties;
85 private final Executor executor;
86 private volatile boolean interrupted;
87
88 private final Lock processingLock = new ReentrantLock();
89 private final CountDownLatch threadLock = new CountDownLatch(1);
90 private final Set<Runnable> shutdownHooks = new LinkedHashSet<Runnable>();
91 private final boolean addTaskWakesUp;
92 private final int maxPendingTasks;
93 private final RejectedExecutionHandler rejectedExecutionHandler;
94 private final boolean supportSuspension;
95
96 private long lastExecutionTime;
97
98 @SuppressWarnings({ "FieldMayBeFinal", "unused" })
99 private volatile int state = ST_NOT_STARTED;
100
101 private volatile long gracefulShutdownQuietPeriod;
102 private volatile long gracefulShutdownTimeout;
103 private long gracefulShutdownStartTime;
104
105 private final Promise<?> terminationFuture = new DefaultPromise<Void>(GlobalEventExecutor.INSTANCE);
106
107
108
109
110
111
112
113
114
115 protected SingleThreadEventExecutor(
116 EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
117 this(parent, new ThreadPerTaskExecutor(threadFactory), addTaskWakesUp);
118 }
119
120
121
122
123
124
125
126
127
128
129
130 protected SingleThreadEventExecutor(
131 EventExecutorGroup parent, ThreadFactory threadFactory,
132 boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedHandler) {
133 this(parent, new ThreadPerTaskExecutor(threadFactory), addTaskWakesUp, maxPendingTasks, rejectedHandler);
134 }
135
136
137
138
139
140
141
142
143
144
145
146
147 protected SingleThreadEventExecutor(
148 EventExecutorGroup parent, ThreadFactory threadFactory,
149 boolean addTaskWakesUp, boolean supportSuspension,
150 int maxPendingTasks, RejectedExecutionHandler rejectedHandler) {
151 this(parent, new ThreadPerTaskExecutor(threadFactory), addTaskWakesUp, supportSuspension,
152 maxPendingTasks, rejectedHandler);
153 }
154
155
156
157
158
159
160
161
162
163 protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp) {
164 this(parent, executor, addTaskWakesUp, DEFAULT_MAX_PENDING_EXECUTOR_TASKS, RejectedExecutionHandlers.reject());
165 }
166
167
168
169
170
171
172
173
174
175
176
177 protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
178 boolean addTaskWakesUp, int maxPendingTasks,
179 RejectedExecutionHandler rejectedHandler) {
180 this(parent, executor, addTaskWakesUp, false, maxPendingTasks, rejectedHandler);
181 }
182
183
184
185
186
187
188
189
190
191
192
193
194 protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
195 boolean addTaskWakesUp, boolean supportSuspension,
196 int maxPendingTasks, RejectedExecutionHandler rejectedHandler) {
197 super(parent);
198 this.addTaskWakesUp = addTaskWakesUp;
199 this.supportSuspension = supportSuspension;
200 this.maxPendingTasks = Math.max(16, maxPendingTasks);
201 this.executor = ThreadExecutorMap.apply(executor, this);
202 taskQueue = newTaskQueue(this.maxPendingTasks);
203 rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
204 }
205
206 protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
207 boolean addTaskWakesUp, Queue<Runnable> taskQueue,
208 RejectedExecutionHandler rejectedHandler) {
209 this(parent, executor, addTaskWakesUp, false, taskQueue, rejectedHandler);
210 }
211
212 protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
213 boolean addTaskWakesUp, boolean supportSuspension,
214 Queue<Runnable> taskQueue, RejectedExecutionHandler rejectedHandler) {
215 super(parent);
216 this.addTaskWakesUp = addTaskWakesUp;
217 this.supportSuspension = supportSuspension;
218 this.maxPendingTasks = DEFAULT_MAX_PENDING_EXECUTOR_TASKS;
219 this.executor = ThreadExecutorMap.apply(executor, this);
220 this.taskQueue = ObjectUtil.checkNotNull(taskQueue, "taskQueue");
221 this.rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
222 }
223
224
225
226
227 @Deprecated
228 protected Queue<Runnable> newTaskQueue() {
229 return newTaskQueue(maxPendingTasks);
230 }
231
232
233
234
235
236
237
238 protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
239 return new LinkedBlockingQueue<Runnable>(maxPendingTasks);
240 }
241
242
243
244
245 protected void interruptThread() {
246 Thread currentThread = thread;
247 if (currentThread == null) {
248 interrupted = true;
249 } else {
250 currentThread.interrupt();
251 }
252 }
253
254
255
256
257 protected Runnable pollTask() {
258 assert inEventLoop();
259 return pollTaskFrom(taskQueue);
260 }
261
262 protected static Runnable pollTaskFrom(Queue<Runnable> taskQueue) {
263 for (;;) {
264 Runnable task = taskQueue.poll();
265 if (task != WAKEUP_TASK) {
266 return task;
267 }
268 }
269 }
270
271
272
273
274
275
276
277
278
279
280 protected Runnable takeTask() {
281 assert inEventLoop();
282 if (!(taskQueue instanceof BlockingQueue)) {
283 throw new UnsupportedOperationException();
284 }
285
286 BlockingQueue<Runnable> taskQueue = (BlockingQueue<Runnable>) this.taskQueue;
287 for (;;) {
288 ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
289 if (scheduledTask == null) {
290 Runnable task = null;
291 try {
292 task = taskQueue.take();
293 if (task == WAKEUP_TASK) {
294 task = null;
295 }
296 } catch (InterruptedException e) {
297
298 }
299 return task;
300 } else {
301 long delayNanos = scheduledTask.delayNanos();
302 Runnable task = null;
303 if (delayNanos > 0) {
304 try {
305 task = taskQueue.poll(delayNanos, TimeUnit.NANOSECONDS);
306 } catch (InterruptedException e) {
307
308 return null;
309 }
310 }
311 if (task == null) {
312
313
314
315
316 fetchFromScheduledTaskQueue();
317 task = taskQueue.poll();
318 }
319
320 if (task != null) {
321 if (task == WAKEUP_TASK) {
322 return null;
323 }
324 return task;
325 }
326 }
327 }
328 }
329
330 private boolean fetchFromScheduledTaskQueue() {
331 if (scheduledTaskQueue == null || scheduledTaskQueue.isEmpty()) {
332 return true;
333 }
334 long nanoTime = getCurrentTimeNanos();
335 for (;;) {
336 Runnable scheduledTask = pollScheduledTask(nanoTime);
337 if (scheduledTask == null) {
338 return true;
339 }
340 if (!taskQueue.offer(scheduledTask)) {
341
342 scheduledTaskQueue.add((ScheduledFutureTask<?>) scheduledTask);
343 return false;
344 }
345 }
346 }
347
348
349
350
351 private boolean executeExpiredScheduledTasks() {
352 if (scheduledTaskQueue == null || scheduledTaskQueue.isEmpty()) {
353 return false;
354 }
355 long nanoTime = getCurrentTimeNanos();
356 Runnable scheduledTask = pollScheduledTask(nanoTime);
357 if (scheduledTask == null) {
358 return false;
359 }
360 do {
361 safeExecute(scheduledTask);
362 } while ((scheduledTask = pollScheduledTask(nanoTime)) != null);
363 return true;
364 }
365
366
367
368
369 protected Runnable peekTask() {
370 assert inEventLoop();
371 return taskQueue.peek();
372 }
373
374
375
376
377 protected boolean hasTasks() {
378 assert inEventLoop();
379 return !taskQueue.isEmpty();
380 }
381
382
383
384
385 public int pendingTasks() {
386 return taskQueue.size();
387 }
388
389
390
391
392
393 protected void addTask(Runnable task) {
394 ObjectUtil.checkNotNull(task, "task");
395 if (!offerTask(task)) {
396 reject(task);
397 }
398 }
399
400 final boolean offerTask(Runnable task) {
401 if (isShutdown()) {
402 reject();
403 }
404 return taskQueue.offer(task);
405 }
406
407
408
409
410 protected boolean removeTask(Runnable task) {
411 return taskQueue.remove(ObjectUtil.checkNotNull(task, "task"));
412 }
413
414
415
416
417
418
419 protected boolean runAllTasks() {
420 assert inEventLoop();
421 boolean fetchedAll;
422 boolean ranAtLeastOne = false;
423
424 do {
425 fetchedAll = fetchFromScheduledTaskQueue();
426 if (runAllTasksFrom(taskQueue)) {
427 ranAtLeastOne = true;
428 }
429 } while (!fetchedAll);
430
431 if (ranAtLeastOne) {
432 lastExecutionTime = getCurrentTimeNanos();
433 }
434 afterRunningAllTasks();
435 return ranAtLeastOne;
436 }
437
438
439
440
441
442
443
444
445
446 protected final boolean runScheduledAndExecutorTasks(final int maxDrainAttempts) {
447 assert inEventLoop();
448 boolean ranAtLeastOneTask;
449 int drainAttempt = 0;
450 do {
451
452
453 ranAtLeastOneTask = runExistingTasksFrom(taskQueue) | executeExpiredScheduledTasks();
454 } while (ranAtLeastOneTask && ++drainAttempt < maxDrainAttempts);
455
456 if (drainAttempt > 0) {
457 lastExecutionTime = getCurrentTimeNanos();
458 }
459 afterRunningAllTasks();
460
461 return drainAttempt > 0;
462 }
463
464
465
466
467
468
469
470
471 protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) {
472 Runnable task = pollTaskFrom(taskQueue);
473 if (task == null) {
474 return false;
475 }
476 for (;;) {
477 safeExecute(task);
478 task = pollTaskFrom(taskQueue);
479 if (task == null) {
480 return true;
481 }
482 }
483 }
484
485
486
487
488
489
490 private boolean runExistingTasksFrom(Queue<Runnable> taskQueue) {
491 Runnable task = pollTaskFrom(taskQueue);
492 if (task == null) {
493 return false;
494 }
495 int remaining = Math.min(maxPendingTasks, taskQueue.size());
496 safeExecute(task);
497
498
499 while (remaining-- > 0 && (task = taskQueue.poll()) != null) {
500 safeExecute(task);
501 }
502 return true;
503 }
504
505
506
507
508
509 protected boolean runAllTasks(long timeoutNanos) {
510 fetchFromScheduledTaskQueue();
511 Runnable task = pollTask();
512 if (task == null) {
513 afterRunningAllTasks();
514 return false;
515 }
516
517 final long deadline = timeoutNanos > 0 ? getCurrentTimeNanos() + timeoutNanos : 0;
518 long runTasks = 0;
519 long lastExecutionTime;
520 for (;;) {
521 safeExecute(task);
522
523 runTasks ++;
524
525
526
527 if ((runTasks & 0x3F) == 0) {
528 lastExecutionTime = getCurrentTimeNanos();
529 if (lastExecutionTime >= deadline) {
530 break;
531 }
532 }
533
534 task = pollTask();
535 if (task == null) {
536 lastExecutionTime = getCurrentTimeNanos();
537 break;
538 }
539 }
540
541 afterRunningAllTasks();
542 this.lastExecutionTime = lastExecutionTime;
543 return true;
544 }
545
546
547
548
549 protected void afterRunningAllTasks() { }
550
551
552
553
554 protected long delayNanos(long currentTimeNanos) {
555 currentTimeNanos -= initialNanoTime();
556
557 ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
558 if (scheduledTask == null) {
559 return SCHEDULE_PURGE_INTERVAL;
560 }
561
562 return scheduledTask.delayNanos(currentTimeNanos);
563 }
564
565
566
567
568
569 protected long deadlineNanos() {
570 ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
571 if (scheduledTask == null) {
572 return getCurrentTimeNanos() + SCHEDULE_PURGE_INTERVAL;
573 }
574 return scheduledTask.deadlineNanos();
575 }
576
577
578
579
580
581
582
583
584 protected void updateLastExecutionTime() {
585 lastExecutionTime = getCurrentTimeNanos();
586 }
587
588
589
590
591 protected abstract void run();
592
593
594
595
596 protected void cleanup() {
597
598 }
599
600 protected void wakeup(boolean inEventLoop) {
601 if (!inEventLoop) {
602
603
604 taskQueue.offer(WAKEUP_TASK);
605 }
606 }
607
608 @Override
609 public boolean inEventLoop(Thread thread) {
610 return thread == this.thread;
611 }
612
613
614
615
616 public void addShutdownHook(final Runnable task) {
617 if (inEventLoop()) {
618 shutdownHooks.add(task);
619 } else {
620 execute(new Runnable() {
621 @Override
622 public void run() {
623 shutdownHooks.add(task);
624 }
625 });
626 }
627 }
628
629
630
631
632 public void removeShutdownHook(final Runnable task) {
633 if (inEventLoop()) {
634 shutdownHooks.remove(task);
635 } else {
636 execute(new Runnable() {
637 @Override
638 public void run() {
639 shutdownHooks.remove(task);
640 }
641 });
642 }
643 }
644
645 private boolean runShutdownHooks() {
646 boolean ran = false;
647
648 while (!shutdownHooks.isEmpty()) {
649 List<Runnable> copy = new ArrayList<Runnable>(shutdownHooks);
650 shutdownHooks.clear();
651 for (Runnable task: copy) {
652 try {
653 runTask(task);
654 } catch (Throwable t) {
655 logger.warn("Shutdown hook raised an exception.", t);
656 } finally {
657 ran = true;
658 }
659 }
660 }
661
662 if (ran) {
663 lastExecutionTime = getCurrentTimeNanos();
664 }
665
666 return ran;
667 }
668
669 @Override
670 public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
671 ObjectUtil.checkPositiveOrZero(quietPeriod, "quietPeriod");
672 if (timeout < quietPeriod) {
673 throw new IllegalArgumentException(
674 "timeout: " + timeout + " (expected >= quietPeriod (" + quietPeriod + "))");
675 }
676 ObjectUtil.checkNotNull(unit, "unit");
677
678 if (isShuttingDown()) {
679 return terminationFuture();
680 }
681
682 boolean inEventLoop = inEventLoop();
683 boolean wakeup;
684 int oldState;
685 for (;;) {
686 if (isShuttingDown()) {
687 return terminationFuture();
688 }
689 int newState;
690 wakeup = true;
691 oldState = state;
692 if (inEventLoop) {
693 newState = ST_SHUTTING_DOWN;
694 } else {
695 switch (oldState) {
696 case ST_NOT_STARTED:
697 case ST_STARTED:
698 case ST_SUSPENDING:
699 case ST_SUSPENDED:
700 newState = ST_SHUTTING_DOWN;
701 break;
702 default:
703 newState = oldState;
704 wakeup = false;
705 }
706 }
707 if (STATE_UPDATER.compareAndSet(this, oldState, newState)) {
708 break;
709 }
710 }
711 gracefulShutdownQuietPeriod = unit.toNanos(quietPeriod);
712 gracefulShutdownTimeout = unit.toNanos(timeout);
713
714 if (ensureThreadStarted(oldState)) {
715 return terminationFuture;
716 }
717
718 if (wakeup) {
719 taskQueue.offer(WAKEUP_TASK);
720 if (!addTaskWakesUp) {
721 wakeup(inEventLoop);
722 }
723 }
724
725 return terminationFuture();
726 }
727
728 @Override
729 public Future<?> terminationFuture() {
730 return terminationFuture;
731 }
732
733 @Override
734 @Deprecated
735 public void shutdown() {
736 if (isShutdown()) {
737 return;
738 }
739
740 boolean inEventLoop = inEventLoop();
741 boolean wakeup;
742 int oldState;
743 for (;;) {
744 if (isShuttingDown()) {
745 return;
746 }
747 int newState;
748 wakeup = true;
749 oldState = state;
750 if (inEventLoop) {
751 newState = ST_SHUTDOWN;
752 } else {
753 switch (oldState) {
754 case ST_NOT_STARTED:
755 case ST_STARTED:
756 case ST_SUSPENDING:
757 case ST_SUSPENDED:
758 case ST_SHUTTING_DOWN:
759 newState = ST_SHUTDOWN;
760 break;
761 default:
762 newState = oldState;
763 wakeup = false;
764 }
765 }
766 if (STATE_UPDATER.compareAndSet(this, oldState, newState)) {
767 break;
768 }
769 }
770
771 if (ensureThreadStarted(oldState)) {
772 return;
773 }
774
775 if (wakeup) {
776 taskQueue.offer(WAKEUP_TASK);
777 if (!addTaskWakesUp) {
778 wakeup(inEventLoop);
779 }
780 }
781 }
782
783 @Override
784 public boolean isShuttingDown() {
785 return state >= ST_SHUTTING_DOWN;
786 }
787
788 @Override
789 public boolean isShutdown() {
790 return state >= ST_SHUTDOWN;
791 }
792
793 @Override
794 public boolean isTerminated() {
795 return state == ST_TERMINATED;
796 }
797
798 @Override
799 public boolean isSuspended() {
800 int currentState = state;
801 return currentState == ST_SUSPENDED || currentState == ST_SUSPENDING;
802 }
803
804 @Override
805 public boolean trySuspend() {
806 if (supportSuspension) {
807 if (STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_SUSPENDING)) {
808 wakeup(inEventLoop());
809 return true;
810 }
811 int currentState = state;
812 return currentState == ST_SUSPENDED || currentState == ST_SUSPENDING;
813 }
814 return false;
815 }
816
817
818
819
820
821
822
823 protected boolean canSuspend() {
824 return canSuspend(state);
825 }
826
827
828
829
830
831
832
833
834
835
836 protected boolean canSuspend(int state) {
837 assert inEventLoop();
838 return supportSuspension && (state == ST_SUSPENDED || state == ST_SUSPENDING)
839 && !hasTasks() && nextScheduledTaskDeadlineNanos() == -1;
840 }
841
842
843
844
845 protected boolean confirmShutdown() {
846 if (!isShuttingDown()) {
847 return false;
848 }
849
850 if (!inEventLoop()) {
851 throw new IllegalStateException("must be invoked from an event loop");
852 }
853
854 cancelScheduledTasks();
855
856 if (gracefulShutdownStartTime == 0) {
857 gracefulShutdownStartTime = getCurrentTimeNanos();
858 }
859
860 if (runAllTasks() || runShutdownHooks()) {
861 if (isShutdown()) {
862
863 return true;
864 }
865
866
867
868
869 if (gracefulShutdownQuietPeriod == 0) {
870 return true;
871 }
872 taskQueue.offer(WAKEUP_TASK);
873 return false;
874 }
875
876 final long nanoTime = getCurrentTimeNanos();
877
878 if (isShutdown() || nanoTime - gracefulShutdownStartTime > gracefulShutdownTimeout) {
879 return true;
880 }
881
882 if (nanoTime - lastExecutionTime <= gracefulShutdownQuietPeriod) {
883
884
885 taskQueue.offer(WAKEUP_TASK);
886 try {
887 Thread.sleep(100);
888 } catch (InterruptedException e) {
889
890 }
891
892 return false;
893 }
894
895
896
897 return true;
898 }
899
900 @Override
901 public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
902 ObjectUtil.checkNotNull(unit, "unit");
903 if (inEventLoop()) {
904 throw new IllegalStateException("cannot await termination of the current thread");
905 }
906
907 threadLock.await(timeout, unit);
908
909 return isTerminated();
910 }
911
912 @Override
913 public void execute(Runnable task) {
914 execute0(task);
915 }
916
917 @Override
918 public void lazyExecute(Runnable task) {
919 lazyExecute0(task);
920 }
921
922 private void execute0(@Schedule Runnable task) {
923 ObjectUtil.checkNotNull(task, "task");
924 execute(task, wakesUpForTask(task));
925 }
926
927 private void lazyExecute0(@Schedule Runnable task) {
928 execute(ObjectUtil.checkNotNull(task, "task"), false);
929 }
930
931 @Override
932 void scheduleRemoveScheduled(final ScheduledFutureTask<?> task) {
933 ObjectUtil.checkNotNull(task, "task");
934 int currentState = state;
935 if (supportSuspension && currentState == ST_SUSPENDED) {
936
937
938
939 execute(new Runnable() {
940 @Override
941 public void run() {
942 task.run();
943 if (canSuspend(ST_SUSPENDED)) {
944
945
946 trySuspend();
947 }
948 }
949 }, true);
950 } else {
951
952 execute(task, false);
953 }
954 }
955
956 private void execute(Runnable task, boolean immediate) {
957 boolean inEventLoop = inEventLoop();
958 addTask(task);
959 if (!inEventLoop) {
960 startThread();
961 if (isShutdown()) {
962 boolean reject = false;
963 try {
964 if (removeTask(task)) {
965 reject = true;
966 }
967 } catch (UnsupportedOperationException e) {
968
969
970
971 }
972 if (reject) {
973 reject();
974 }
975 }
976 }
977
978 if (!addTaskWakesUp && immediate) {
979 wakeup(inEventLoop);
980 }
981 }
982
983 @Override
984 public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
985 throwIfInEventLoop("invokeAny");
986 return super.invokeAny(tasks);
987 }
988
989 @Override
990 public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
991 throws InterruptedException, ExecutionException, TimeoutException {
992 throwIfInEventLoop("invokeAny");
993 return super.invokeAny(tasks, timeout, unit);
994 }
995
996 @Override
997 public <T> List<java.util.concurrent.Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
998 throws InterruptedException {
999 throwIfInEventLoop("invokeAll");
1000 return super.invokeAll(tasks);
1001 }
1002
1003 @Override
1004 public <T> List<java.util.concurrent.Future<T>> invokeAll(
1005 Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
1006 throwIfInEventLoop("invokeAll");
1007 return super.invokeAll(tasks, timeout, unit);
1008 }
1009
1010 private void throwIfInEventLoop(String method) {
1011 if (inEventLoop()) {
1012 throw new RejectedExecutionException("Calling " + method + " from within the EventLoop is not allowed");
1013 }
1014 }
1015
1016
1017
1018
1019
1020
1021 public final ThreadProperties threadProperties() {
1022 ThreadProperties threadProperties = this.threadProperties;
1023 if (threadProperties == null) {
1024 Thread thread = this.thread;
1025 if (thread == null) {
1026 assert !inEventLoop();
1027 submit(NOOP_TASK).syncUninterruptibly();
1028 thread = this.thread;
1029 assert thread != null;
1030 }
1031
1032 threadProperties = new DefaultThreadProperties(thread);
1033 if (!PROPERTIES_UPDATER.compareAndSet(this, null, threadProperties)) {
1034 threadProperties = this.threadProperties;
1035 }
1036 }
1037
1038 return threadProperties;
1039 }
1040
1041
1042
1043
1044 @Deprecated
1045 protected interface NonWakeupRunnable extends LazyRunnable { }
1046
1047
1048
1049
1050
1051 protected boolean wakesUpForTask(Runnable task) {
1052 return true;
1053 }
1054
1055 protected static void reject() {
1056 throw new RejectedExecutionException("event executor terminated");
1057 }
1058
1059
1060
1061
1062
1063
1064 protected final void reject(Runnable task) {
1065 rejectedExecutionHandler.rejected(task, this);
1066 }
1067
1068
1069
1070 private static final long SCHEDULE_PURGE_INTERVAL = TimeUnit.SECONDS.toNanos(1);
1071
1072 private void startThread() {
1073 int currentState = state;
1074 if (currentState == ST_NOT_STARTED || currentState == ST_SUSPENDED) {
1075 if (STATE_UPDATER.compareAndSet(this, currentState, ST_STARTED)) {
1076 boolean success = false;
1077 try {
1078 doStartThread();
1079 success = true;
1080 } finally {
1081 if (!success) {
1082 STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
1083 }
1084 }
1085 }
1086 }
1087 }
1088
1089 private boolean ensureThreadStarted(int oldState) {
1090 if (oldState == ST_NOT_STARTED || oldState == ST_SUSPENDED) {
1091 try {
1092 doStartThread();
1093 } catch (Throwable cause) {
1094 STATE_UPDATER.set(this, ST_TERMINATED);
1095 terminationFuture.tryFailure(cause);
1096
1097 if (!(cause instanceof Exception)) {
1098
1099 PlatformDependent.throwException(cause);
1100 }
1101 return true;
1102 }
1103 }
1104 return false;
1105 }
1106
1107 private void doStartThread() {
1108 executor.execute(new Runnable() {
1109 @Override
1110 public void run() {
1111 processingLock.lock();
1112 assert thread == null;
1113 thread = Thread.currentThread();
1114 if (interrupted) {
1115 thread.interrupt();
1116 interrupted = false;
1117 }
1118 boolean success = false;
1119 updateLastExecutionTime();
1120 boolean suspend = false;
1121 try {
1122 for (;;) {
1123 SingleThreadEventExecutor.this.run();
1124 success = true;
1125
1126 int currentState = state;
1127 if (canSuspend(currentState)) {
1128 if (!STATE_UPDATER.compareAndSet(SingleThreadEventExecutor.this,
1129 ST_SUSPENDING, ST_SUSPENDED)) {
1130
1131 continue;
1132 }
1133
1134 if (!canSuspend(ST_SUSPENDED) && STATE_UPDATER.compareAndSet(SingleThreadEventExecutor.this,
1135 ST_SUSPENDED, ST_STARTED)) {
1136
1137
1138 continue;
1139 }
1140 suspend = true;
1141 }
1142 break;
1143 }
1144 } catch (Throwable t) {
1145 logger.warn("Unexpected exception from an event executor: ", t);
1146 } finally {
1147 boolean shutdown = !suspend;
1148 if (shutdown) {
1149 for (;;) {
1150
1151 int oldState = state;
1152 if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
1153 SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
1154 break;
1155 }
1156 }
1157 if (success && gracefulShutdownStartTime == 0) {
1158
1159 if (logger.isErrorEnabled()) {
1160 logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
1161 SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must " +
1162 "be called before run() implementation terminates.");
1163 }
1164 }
1165 }
1166
1167 try {
1168 if (shutdown) {
1169
1170
1171
1172 for (;;) {
1173 if (confirmShutdown()) {
1174 break;
1175 }
1176 }
1177
1178
1179
1180 for (;;) {
1181 int currentState = state;
1182 if (currentState >= ST_SHUTDOWN || STATE_UPDATER.compareAndSet(
1183 SingleThreadEventExecutor.this, currentState, ST_SHUTDOWN)) {
1184 break;
1185 }
1186 }
1187
1188
1189
1190 confirmShutdown();
1191 }
1192 } finally {
1193 try {
1194 if (shutdown) {
1195 try {
1196 cleanup();
1197 } finally {
1198
1199
1200
1201
1202 FastThreadLocal.removeAll();
1203
1204 STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
1205 threadLock.countDown();
1206 int numUserTasks = drainTasks();
1207 if (numUserTasks > 0 && logger.isWarnEnabled()) {
1208 logger.warn("An event executor terminated with " +
1209 "non-empty task queue (" + numUserTasks + ')');
1210 }
1211 terminationFuture.setSuccess(null);
1212 }
1213 } else {
1214
1215 FastThreadLocal.removeAll();
1216
1217
1218 threadProperties = null;
1219 }
1220 } finally {
1221 thread = null;
1222
1223 processingLock.unlock();
1224 }
1225 }
1226 }
1227 }
1228 });
1229 }
1230
1231 final int drainTasks() {
1232 int numTasks = 0;
1233 for (;;) {
1234 Runnable runnable = taskQueue.poll();
1235 if (runnable == null) {
1236 break;
1237 }
1238
1239
1240 if (WAKEUP_TASK != runnable) {
1241 numTasks++;
1242 }
1243 }
1244 return numTasks;
1245 }
1246
1247 private static final class DefaultThreadProperties implements ThreadProperties {
1248 private final Thread t;
1249
1250 DefaultThreadProperties(Thread t) {
1251 this.t = t;
1252 }
1253
1254 @Override
1255 public State state() {
1256 return t.getState();
1257 }
1258
1259 @Override
1260 public int priority() {
1261 return t.getPriority();
1262 }
1263
1264 @Override
1265 public boolean isInterrupted() {
1266 return t.isInterrupted();
1267 }
1268
1269 @Override
1270 public boolean isDaemon() {
1271 return t.isDaemon();
1272 }
1273
1274 @Override
1275 public String name() {
1276 return t.getName();
1277 }
1278
1279 @Override
1280 public long id() {
1281 return t.getId();
1282 }
1283
1284 @Override
1285 public StackTraceElement[] stackTrace() {
1286 return t.getStackTrace();
1287 }
1288
1289 @Override
1290 public boolean isAlive() {
1291 return t.isAlive();
1292 }
1293 }
1294 }