1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.util.concurrent;
17
18 import io.netty.util.internal.ObjectUtil;
19 import io.netty.util.internal.PlatformDependent;
20 import io.netty.util.internal.SystemPropertyUtil;
21 import io.netty.util.internal.ThreadExecutorMap;
22 import io.netty.util.internal.logging.InternalLogger;
23 import io.netty.util.internal.logging.InternalLoggerFactory;
24 import org.jetbrains.annotations.Async.Schedule;
25
26 import java.lang.Thread.State;
27 import java.util.ArrayList;
28 import java.util.Collection;
29 import java.util.LinkedHashSet;
30 import java.util.List;
31 import java.util.Queue;
32 import java.util.Set;
33 import java.util.concurrent.BlockingQueue;
34 import java.util.concurrent.Callable;
35 import java.util.concurrent.CountDownLatch;
36 import java.util.concurrent.ExecutionException;
37 import java.util.concurrent.Executor;
38 import java.util.concurrent.LinkedBlockingQueue;
39 import java.util.concurrent.RejectedExecutionException;
40 import java.util.concurrent.ThreadFactory;
41 import java.util.concurrent.TimeUnit;
42 import java.util.concurrent.TimeoutException;
43 import java.util.concurrent.atomic.AtomicInteger;
44 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
45 import java.util.concurrent.atomic.AtomicLong;
46 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
47 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
48 import java.util.concurrent.locks.Lock;
49 import java.util.concurrent.locks.ReentrantLock;
50
51
52
53
54
55 public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
56
57 static final int DEFAULT_MAX_PENDING_EXECUTOR_TASKS = Math.max(16,
58 SystemPropertyUtil.getInt("io.netty.eventexecutor.maxPendingTasks", Integer.MAX_VALUE));
59
60 private static final InternalLogger logger =
61 InternalLoggerFactory.getInstance(SingleThreadEventExecutor.class);
62
63 private static final int ST_NOT_STARTED = 1;
64 private static final int ST_SUSPENDING = 2;
65 private static final int ST_SUSPENDED = 3;
66 private static final int ST_STARTED = 4;
67 private static final int ST_SHUTTING_DOWN = 5;
68 private static final int ST_SHUTDOWN = 6;
69 private static final int ST_TERMINATED = 7;
70
71 private static final Runnable NOOP_TASK = new Runnable() {
72 @Override
73 public void run() {
74
75 }
76 };
77
78 private static final AtomicIntegerFieldUpdater<SingleThreadEventExecutor> STATE_UPDATER =
79 AtomicIntegerFieldUpdater.newUpdater(SingleThreadEventExecutor.class, "state");
80 private static final AtomicReferenceFieldUpdater<SingleThreadEventExecutor, ThreadProperties> PROPERTIES_UPDATER =
81 AtomicReferenceFieldUpdater.newUpdater(
82 SingleThreadEventExecutor.class, ThreadProperties.class, "threadProperties");
83 private static final AtomicLongFieldUpdater<SingleThreadEventExecutor> ACCUMULATED_ACTIVE_TIME_NANOS_UPDATER =
84 AtomicLongFieldUpdater.newUpdater(SingleThreadEventExecutor.class, "accumulatedActiveTimeNanos");
85 private static final AtomicIntegerFieldUpdater<SingleThreadEventExecutor> CONSECUTIVE_IDLE_CYCLES_UPDATER =
86 AtomicIntegerFieldUpdater.newUpdater(SingleThreadEventExecutor.class, "consecutiveIdleCycles");
87 private static final AtomicIntegerFieldUpdater<SingleThreadEventExecutor> CONSECUTIVE_BUSY_CYCLES_UPDATER =
88 AtomicIntegerFieldUpdater.newUpdater(SingleThreadEventExecutor.class, "consecutiveBusyCycles");
89 private final Queue<Runnable> taskQueue;
90
91 private volatile Thread thread;
92 @SuppressWarnings("unused")
93 private volatile ThreadProperties threadProperties;
94 private final Executor executor;
95 private volatile boolean interrupted;
96
97 private final Lock processingLock = new ReentrantLock();
98 private final CountDownLatch threadLock = new CountDownLatch(1);
99 private final Set<Runnable> shutdownHooks = new LinkedHashSet<Runnable>();
100 private final boolean addTaskWakesUp;
101 private final int maxPendingTasks;
102 private final RejectedExecutionHandler rejectedExecutionHandler;
103 private final boolean supportSuspension;
104
105
106 private volatile long accumulatedActiveTimeNanos;
107
108 private volatile long lastActivityTimeNanos;
109
110
111
112
113 private volatile int consecutiveIdleCycles;
114
115
116
117
118
119 private volatile int consecutiveBusyCycles;
120 private long lastExecutionTime;
121
122 @SuppressWarnings({ "FieldMayBeFinal", "unused" })
123 private volatile int state = ST_NOT_STARTED;
124
125 private volatile long gracefulShutdownQuietPeriod;
126 private volatile long gracefulShutdownTimeout;
127 private long gracefulShutdownStartTime;
128
129 private final Promise<?> terminationFuture = new DefaultPromise<Void>(GlobalEventExecutor.INSTANCE);
130
131
132
133
134
135
136
137
138
139 protected SingleThreadEventExecutor(
140 EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
141 this(parent, new ThreadPerTaskExecutor(threadFactory), addTaskWakesUp);
142 }
143
144
145
146
147
148
149
150
151
152
153
154 protected SingleThreadEventExecutor(
155 EventExecutorGroup parent, ThreadFactory threadFactory,
156 boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedHandler) {
157 this(parent, new ThreadPerTaskExecutor(threadFactory), addTaskWakesUp, maxPendingTasks, rejectedHandler);
158 }
159
160
161
162
163
164
165
166
167
168
169
170
171 protected SingleThreadEventExecutor(
172 EventExecutorGroup parent, ThreadFactory threadFactory,
173 boolean addTaskWakesUp, boolean supportSuspension,
174 int maxPendingTasks, RejectedExecutionHandler rejectedHandler) {
175 this(parent, new ThreadPerTaskExecutor(threadFactory), addTaskWakesUp, supportSuspension,
176 maxPendingTasks, rejectedHandler);
177 }
178
179
180
181
182
183
184
185
186
187 protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp) {
188 this(parent, executor, addTaskWakesUp, DEFAULT_MAX_PENDING_EXECUTOR_TASKS, RejectedExecutionHandlers.reject());
189 }
190
191
192
193
194
195
196
197
198
199
200
201 protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
202 boolean addTaskWakesUp, int maxPendingTasks,
203 RejectedExecutionHandler rejectedHandler) {
204 this(parent, executor, addTaskWakesUp, false, maxPendingTasks, rejectedHandler);
205 }
206
207
208
209
210
211
212
213
214
215
216
217
218 protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
219 boolean addTaskWakesUp, boolean supportSuspension,
220 int maxPendingTasks, RejectedExecutionHandler rejectedHandler) {
221 super(parent);
222 this.addTaskWakesUp = addTaskWakesUp;
223 this.supportSuspension = supportSuspension;
224 this.maxPendingTasks = Math.max(16, maxPendingTasks);
225 this.executor = ThreadExecutorMap.apply(executor, this);
226 taskQueue = newTaskQueue(this.maxPendingTasks);
227 rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
228 lastActivityTimeNanos = ticker().nanoTime();
229 }
230
231 protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
232 boolean addTaskWakesUp, Queue<Runnable> taskQueue,
233 RejectedExecutionHandler rejectedHandler) {
234 this(parent, executor, addTaskWakesUp, false, taskQueue, rejectedHandler);
235 }
236
237 protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
238 boolean addTaskWakesUp, boolean supportSuspension,
239 Queue<Runnable> taskQueue, RejectedExecutionHandler rejectedHandler) {
240 super(parent);
241 this.addTaskWakesUp = addTaskWakesUp;
242 this.supportSuspension = supportSuspension;
243 this.maxPendingTasks = DEFAULT_MAX_PENDING_EXECUTOR_TASKS;
244 this.executor = ThreadExecutorMap.apply(executor, this);
245 this.taskQueue = ObjectUtil.checkNotNull(taskQueue, "taskQueue");
246 this.rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
247 }
248
249
250
251
252 @Deprecated
253 protected Queue<Runnable> newTaskQueue() {
254 return newTaskQueue(maxPendingTasks);
255 }
256
257
258
259
260
261
262
263 protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
264 return new LinkedBlockingQueue<Runnable>(maxPendingTasks);
265 }
266
267
268
269
270 protected void interruptThread() {
271 Thread currentThread = thread;
272 if (currentThread == null) {
273 interrupted = true;
274 } else {
275 currentThread.interrupt();
276 }
277 }
278
279
280
281
282 protected Runnable pollTask() {
283 assert inEventLoop();
284 return pollTaskFrom(taskQueue);
285 }
286
287 protected static Runnable pollTaskFrom(Queue<Runnable> taskQueue) {
288 for (;;) {
289 Runnable task = taskQueue.poll();
290 if (task != WAKEUP_TASK) {
291 return task;
292 }
293 }
294 }
295
296
297
298
299
300
301
302
303
304
305 protected Runnable takeTask() {
306 assert inEventLoop();
307 if (!(taskQueue instanceof BlockingQueue)) {
308 throw new UnsupportedOperationException();
309 }
310
311 BlockingQueue<Runnable> taskQueue = (BlockingQueue<Runnable>) this.taskQueue;
312 for (;;) {
313 ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
314 if (scheduledTask == null) {
315 Runnable task = null;
316 try {
317 task = taskQueue.take();
318 if (task == WAKEUP_TASK) {
319 task = null;
320 }
321 } catch (InterruptedException e) {
322
323 }
324 return task;
325 } else {
326 long delayNanos = scheduledTask.delayNanos();
327 Runnable task = null;
328 if (delayNanos > 0) {
329 try {
330 task = taskQueue.poll(delayNanos, TimeUnit.NANOSECONDS);
331 } catch (InterruptedException e) {
332
333 return null;
334 }
335 }
336 if (task == null) {
337
338
339
340
341 fetchFromScheduledTaskQueue();
342 task = taskQueue.poll();
343 }
344
345 if (task != null) {
346 if (task == WAKEUP_TASK) {
347 return null;
348 }
349 return task;
350 }
351 }
352 }
353 }
354
355 private boolean fetchFromScheduledTaskQueue() {
356 return fetchFromScheduledTaskQueue(taskQueue);
357 }
358
359
360
361
362 private boolean executeExpiredScheduledTasks() {
363 if (scheduledTaskQueue == null || scheduledTaskQueue.isEmpty()) {
364 return false;
365 }
366 long nanoTime = getCurrentTimeNanos();
367 Runnable scheduledTask = pollScheduledTask(nanoTime);
368 if (scheduledTask == null) {
369 return false;
370 }
371 do {
372 safeExecute(scheduledTask);
373 } while ((scheduledTask = pollScheduledTask(nanoTime)) != null);
374 return true;
375 }
376
377
378
379
380 protected Runnable peekTask() {
381 assert inEventLoop();
382 return taskQueue.peek();
383 }
384
385
386
387
388 protected boolean hasTasks() {
389 assert inEventLoop();
390 return !taskQueue.isEmpty();
391 }
392
393
394
395
396 public int pendingTasks() {
397 return taskQueue.size();
398 }
399
400
401
402
403
404 protected void addTask(Runnable task) {
405 ObjectUtil.checkNotNull(task, "task");
406 if (!offerTask(task)) {
407 reject(task);
408 }
409 }
410
411 final boolean offerTask(Runnable task) {
412 if (isShutdown()) {
413 reject();
414 }
415 return taskQueue.offer(task);
416 }
417
418
419
420
421 protected boolean removeTask(Runnable task) {
422 return taskQueue.remove(ObjectUtil.checkNotNull(task, "task"));
423 }
424
425
426
427
428
429
430 protected boolean runAllTasks() {
431 assert inEventLoop();
432 boolean fetchedAll;
433 boolean ranAtLeastOne = false;
434
435 do {
436 fetchedAll = fetchFromScheduledTaskQueue(taskQueue);
437 if (runAllTasksFrom(taskQueue)) {
438 ranAtLeastOne = true;
439 }
440 } while (!fetchedAll);
441
442 if (ranAtLeastOne) {
443 lastExecutionTime = getCurrentTimeNanos();
444 }
445 afterRunningAllTasks();
446 return ranAtLeastOne;
447 }
448
449
450
451
452
453
454
455
456
457 protected final boolean runScheduledAndExecutorTasks(final int maxDrainAttempts) {
458 assert inEventLoop();
459 boolean ranAtLeastOneTask;
460 int drainAttempt = 0;
461 do {
462
463
464 ranAtLeastOneTask = runExistingTasksFrom(taskQueue) | executeExpiredScheduledTasks();
465 } while (ranAtLeastOneTask && ++drainAttempt < maxDrainAttempts);
466
467 if (drainAttempt > 0) {
468 lastExecutionTime = getCurrentTimeNanos();
469 }
470 afterRunningAllTasks();
471
472 return drainAttempt > 0;
473 }
474
475
476
477
478
479
480
481
482 protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) {
483 Runnable task = pollTaskFrom(taskQueue);
484 if (task == null) {
485 return false;
486 }
487 for (;;) {
488 safeExecute(task);
489 task = pollTaskFrom(taskQueue);
490 if (task == null) {
491 return true;
492 }
493 }
494 }
495
496
497
498
499
500
501 private boolean runExistingTasksFrom(Queue<Runnable> taskQueue) {
502 Runnable task = pollTaskFrom(taskQueue);
503 if (task == null) {
504 return false;
505 }
506 int remaining = Math.min(maxPendingTasks, taskQueue.size());
507 safeExecute(task);
508
509
510 while (remaining-- > 0 && (task = taskQueue.poll()) != null) {
511 safeExecute(task);
512 }
513 return true;
514 }
515
516
517
518
519
520 @SuppressWarnings("NonAtomicOperationOnVolatileField")
521 protected boolean runAllTasks(long timeoutNanos) {
522 fetchFromScheduledTaskQueue(taskQueue);
523 Runnable task = pollTask();
524 if (task == null) {
525 afterRunningAllTasks();
526 return false;
527 }
528
529 final long deadline = timeoutNanos > 0 ? getCurrentTimeNanos() + timeoutNanos : 0;
530 long runTasks = 0;
531 long lastExecutionTime;
532
533 long workStartTime = ticker().nanoTime();
534 for (;;) {
535 safeExecute(task);
536
537 runTasks ++;
538
539
540
541 if ((runTasks & 0x3F) == 0) {
542 lastExecutionTime = getCurrentTimeNanos();
543 if (lastExecutionTime >= deadline) {
544 break;
545 }
546 }
547
548 task = pollTask();
549 if (task == null) {
550 lastExecutionTime = getCurrentTimeNanos();
551 break;
552 }
553 }
554
555 long workEndTime = ticker().nanoTime();
556 accumulatedActiveTimeNanos += workEndTime - workStartTime;
557 lastActivityTimeNanos = workEndTime;
558
559 afterRunningAllTasks();
560 this.lastExecutionTime = lastExecutionTime;
561 return true;
562 }
563
564
565
566
567 protected void afterRunningAllTasks() { }
568
569
570
571
572 protected long delayNanos(long currentTimeNanos) {
573 currentTimeNanos -= ticker().initialNanoTime();
574
575 ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
576 if (scheduledTask == null) {
577 return SCHEDULE_PURGE_INTERVAL;
578 }
579
580 return scheduledTask.delayNanos(currentTimeNanos);
581 }
582
583
584
585
586
587 protected long deadlineNanos() {
588 ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
589 if (scheduledTask == null) {
590 return getCurrentTimeNanos() + SCHEDULE_PURGE_INTERVAL;
591 }
592 return scheduledTask.deadlineNanos();
593 }
594
595
596
597
598
599
600
601
602 protected void updateLastExecutionTime() {
603 long now = getCurrentTimeNanos();
604 lastExecutionTime = now;
605 lastActivityTimeNanos = now;
606 }
607
608
609
610
611
612
613
614 protected int getNumOfRegisteredChannels() {
615 return -1;
616 }
617
618
619
620
621
622
623
624
625
626 @SuppressWarnings("NonAtomicOperationOnVolatileField")
627 protected void reportActiveIoTime(long nanos) {
628 assert inEventLoop();
629 if (nanos > 0) {
630 accumulatedActiveTimeNanos += nanos;
631 lastActivityTimeNanos = ticker().nanoTime();
632 }
633 }
634
635
636
637
638 protected long getAndResetAccumulatedActiveTimeNanos() {
639 return ACCUMULATED_ACTIVE_TIME_NANOS_UPDATER.getAndSet(this, 0);
640 }
641
642
643
644
645 protected long getLastActivityTimeNanos() {
646 return lastActivityTimeNanos;
647 }
648
649
650
651
652
653
654
655 protected int getAndIncrementIdleCycles() {
656 return CONSECUTIVE_IDLE_CYCLES_UPDATER.getAndIncrement(this);
657 }
658
659
660
661
662
663 protected void resetIdleCycles() {
664 CONSECUTIVE_IDLE_CYCLES_UPDATER.set(this, 0);
665 }
666
667
668
669
670
671
672
673 protected int getAndIncrementBusyCycles() {
674 return CONSECUTIVE_BUSY_CYCLES_UPDATER.getAndIncrement(this);
675 }
676
677
678
679
680
681 protected void resetBusyCycles() {
682 CONSECUTIVE_BUSY_CYCLES_UPDATER.set(this, 0);
683 }
684
685
686
687
688 protected boolean isSuspensionSupported() {
689 return supportSuspension;
690 }
691
692
693
694
695 protected abstract void run();
696
697
698
699
700 protected void cleanup() {
701
702 }
703
704 protected void wakeup(boolean inEventLoop) {
705 if (!inEventLoop) {
706
707
708 taskQueue.offer(WAKEUP_TASK);
709 }
710 }
711
712 @Override
713 public boolean inEventLoop(Thread thread) {
714 return thread == this.thread;
715 }
716
717
718
719
720 public void addShutdownHook(final Runnable task) {
721 if (inEventLoop()) {
722 shutdownHooks.add(task);
723 } else {
724 execute(new Runnable() {
725 @Override
726 public void run() {
727 shutdownHooks.add(task);
728 }
729 });
730 }
731 }
732
733
734
735
736 public void removeShutdownHook(final Runnable task) {
737 if (inEventLoop()) {
738 shutdownHooks.remove(task);
739 } else {
740 execute(new Runnable() {
741 @Override
742 public void run() {
743 shutdownHooks.remove(task);
744 }
745 });
746 }
747 }
748
749 private boolean runShutdownHooks() {
750 boolean ran = false;
751
752 while (!shutdownHooks.isEmpty()) {
753 List<Runnable> copy = new ArrayList<Runnable>(shutdownHooks);
754 shutdownHooks.clear();
755 for (Runnable task: copy) {
756 try {
757 runTask(task);
758 } catch (Throwable t) {
759 logger.warn("Shutdown hook raised an exception.", t);
760 } finally {
761 ran = true;
762 }
763 }
764 }
765
766 if (ran) {
767 lastExecutionTime = getCurrentTimeNanos();
768 }
769
770 return ran;
771 }
772
773 private void shutdown0(long quietPeriod, long timeout, int shutdownState) {
774 if (isShuttingDown()) {
775 return;
776 }
777
778 boolean inEventLoop = inEventLoop();
779 boolean wakeup;
780 int oldState;
781 for (;;) {
782 if (isShuttingDown()) {
783 return;
784 }
785 int newState;
786 wakeup = true;
787 oldState = state;
788 if (inEventLoop) {
789 newState = shutdownState;
790 } else {
791 switch (oldState) {
792 case ST_NOT_STARTED:
793 case ST_STARTED:
794 case ST_SUSPENDING:
795 case ST_SUSPENDED:
796 newState = shutdownState;
797 break;
798 default:
799 newState = oldState;
800 wakeup = false;
801 }
802 }
803 if (STATE_UPDATER.compareAndSet(this, oldState, newState)) {
804 break;
805 }
806 }
807 if (quietPeriod != -1) {
808 gracefulShutdownQuietPeriod = quietPeriod;
809 }
810 if (timeout != -1) {
811 gracefulShutdownTimeout = timeout;
812 }
813
814 if (ensureThreadStarted(oldState)) {
815 return;
816 }
817
818 if (wakeup) {
819 taskQueue.offer(WAKEUP_TASK);
820 if (!addTaskWakesUp) {
821 wakeup(inEventLoop);
822 }
823 }
824 }
825
826 @Override
827 public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
828 ObjectUtil.checkPositiveOrZero(quietPeriod, "quietPeriod");
829 if (timeout < quietPeriod) {
830 throw new IllegalArgumentException(
831 "timeout: " + timeout + " (expected >= quietPeriod (" + quietPeriod + "))");
832 }
833 ObjectUtil.checkNotNull(unit, "unit");
834
835 shutdown0(unit.toNanos(quietPeriod), unit.toNanos(timeout), ST_SHUTTING_DOWN);
836 return terminationFuture();
837 }
838
839 @Override
840 public Future<?> terminationFuture() {
841 return terminationFuture;
842 }
843
844 @Override
845 @Deprecated
846 public void shutdown() {
847 shutdown0(-1, -1, ST_SHUTDOWN);
848 }
849
850 @Override
851 public boolean isShuttingDown() {
852 return state >= ST_SHUTTING_DOWN;
853 }
854
855 @Override
856 public boolean isShutdown() {
857 return state >= ST_SHUTDOWN;
858 }
859
860 @Override
861 public boolean isTerminated() {
862 return state == ST_TERMINATED;
863 }
864
865 @Override
866 public boolean isSuspended() {
867 int currentState = state;
868 return currentState == ST_SUSPENDED || currentState == ST_SUSPENDING;
869 }
870
871 @Override
872 public boolean trySuspend() {
873 if (supportSuspension) {
874 if (STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_SUSPENDING)) {
875 wakeup(inEventLoop());
876 return true;
877 }
878 int currentState = state;
879 return currentState == ST_SUSPENDED || currentState == ST_SUSPENDING;
880 }
881 return false;
882 }
883
884
885
886
887
888
889
890 protected boolean canSuspend() {
891 return canSuspend(state);
892 }
893
894
895
896
897
898
899
900
901
902
903 protected boolean canSuspend(int state) {
904 assert inEventLoop();
905 return supportSuspension && (state == ST_SUSPENDED || state == ST_SUSPENDING)
906 && !hasTasks() && nextScheduledTaskDeadlineNanos() == -1;
907 }
908
909
910
911
912 protected boolean confirmShutdown() {
913 if (!isShuttingDown()) {
914 return false;
915 }
916
917 if (!inEventLoop()) {
918 throw new IllegalStateException("must be invoked from an event loop");
919 }
920
921 cancelScheduledTasks();
922
923 if (gracefulShutdownStartTime == 0) {
924 gracefulShutdownStartTime = getCurrentTimeNanos();
925 }
926
927 if (runAllTasks() || runShutdownHooks()) {
928 if (isShutdown()) {
929
930 return true;
931 }
932
933
934
935
936 if (gracefulShutdownQuietPeriod == 0) {
937 return true;
938 }
939 taskQueue.offer(WAKEUP_TASK);
940 return false;
941 }
942
943 final long nanoTime = getCurrentTimeNanos();
944
945 if (isShutdown() || nanoTime - gracefulShutdownStartTime > gracefulShutdownTimeout) {
946 return true;
947 }
948
949 if (nanoTime - lastExecutionTime <= gracefulShutdownQuietPeriod) {
950
951
952 taskQueue.offer(WAKEUP_TASK);
953 try {
954 Thread.sleep(100);
955 } catch (InterruptedException e) {
956
957 }
958
959 return false;
960 }
961
962
963
964 return true;
965 }
966
967 @Override
968 public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
969 ObjectUtil.checkNotNull(unit, "unit");
970 if (inEventLoop()) {
971 throw new IllegalStateException("cannot await termination of the current thread");
972 }
973
974 threadLock.await(timeout, unit);
975
976 return isTerminated();
977 }
978
979 @Override
980 public void execute(Runnable task) {
981 execute0(task);
982 }
983
984 @Override
985 public void lazyExecute(Runnable task) {
986 lazyExecute0(task);
987 }
988
989 private void execute0(@Schedule Runnable task) {
990 ObjectUtil.checkNotNull(task, "task");
991 execute(task, wakesUpForTask(task));
992 }
993
994 private void lazyExecute0(@Schedule Runnable task) {
995 execute(ObjectUtil.checkNotNull(task, "task"), false);
996 }
997
998 @Override
999 void scheduleRemoveScheduled(final ScheduledFutureTask<?> task) {
1000 ObjectUtil.checkNotNull(task, "task");
1001 int currentState = state;
1002 if (supportSuspension && currentState == ST_SUSPENDED) {
1003
1004
1005
1006 execute(new Runnable() {
1007 @Override
1008 public void run() {
1009 task.run();
1010 if (canSuspend(ST_SUSPENDED)) {
1011
1012
1013 trySuspend();
1014 }
1015 }
1016 }, true);
1017 } else {
1018
1019 execute(task, false);
1020 }
1021 }
1022
1023 private void execute(Runnable task, boolean immediate) {
1024 boolean inEventLoop = inEventLoop();
1025 addTask(task);
1026 if (!inEventLoop) {
1027 startThread();
1028 if (isShutdown()) {
1029 boolean reject = false;
1030 try {
1031 if (removeTask(task)) {
1032 reject = true;
1033 }
1034 } catch (UnsupportedOperationException e) {
1035
1036
1037
1038 }
1039 if (reject) {
1040 reject();
1041 }
1042 }
1043 }
1044
1045 if (!addTaskWakesUp && immediate) {
1046 wakeup(inEventLoop);
1047 }
1048 }
1049
1050 @Override
1051 public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
1052 throwIfInEventLoop("invokeAny");
1053 return super.invokeAny(tasks);
1054 }
1055
1056 @Override
1057 public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
1058 throws InterruptedException, ExecutionException, TimeoutException {
1059 throwIfInEventLoop("invokeAny");
1060 return super.invokeAny(tasks, timeout, unit);
1061 }
1062
1063 @Override
1064 public <T> List<java.util.concurrent.Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
1065 throws InterruptedException {
1066 throwIfInEventLoop("invokeAll");
1067 return super.invokeAll(tasks);
1068 }
1069
1070 @Override
1071 public <T> List<java.util.concurrent.Future<T>> invokeAll(
1072 Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
1073 throwIfInEventLoop("invokeAll");
1074 return super.invokeAll(tasks, timeout, unit);
1075 }
1076
1077 private void throwIfInEventLoop(String method) {
1078 if (inEventLoop()) {
1079 throw new RejectedExecutionException("Calling " + method + " from within the EventLoop is not allowed");
1080 }
1081 }
1082
1083
1084
1085
1086
1087
1088 public final ThreadProperties threadProperties() {
1089 ThreadProperties threadProperties = this.threadProperties;
1090 if (threadProperties == null) {
1091 Thread thread = this.thread;
1092 if (thread == null) {
1093 assert !inEventLoop();
1094 submit(NOOP_TASK).syncUninterruptibly();
1095 thread = this.thread;
1096 assert thread != null;
1097 }
1098
1099 threadProperties = new DefaultThreadProperties(thread);
1100 if (!PROPERTIES_UPDATER.compareAndSet(this, null, threadProperties)) {
1101 threadProperties = this.threadProperties;
1102 }
1103 }
1104
1105 return threadProperties;
1106 }
1107
1108
1109
1110
1111 @Deprecated
1112 protected interface NonWakeupRunnable extends LazyRunnable { }
1113
1114
1115
1116
1117
1118 protected boolean wakesUpForTask(Runnable task) {
1119 return true;
1120 }
1121
1122 protected static void reject() {
1123 throw new RejectedExecutionException("event executor terminated");
1124 }
1125
1126
1127
1128
1129
1130
1131 protected final void reject(Runnable task) {
1132 rejectedExecutionHandler.rejected(task, this);
1133 }
1134
1135
1136
1137 private static final long SCHEDULE_PURGE_INTERVAL = TimeUnit.SECONDS.toNanos(1);
1138
1139 private void startThread() {
1140 int currentState = state;
1141 if (currentState == ST_NOT_STARTED || currentState == ST_SUSPENDED) {
1142 if (STATE_UPDATER.compareAndSet(this, currentState, ST_STARTED)) {
1143 resetIdleCycles();
1144 resetBusyCycles();
1145 boolean success = false;
1146 try {
1147 doStartThread();
1148 success = true;
1149 } finally {
1150 if (!success) {
1151 STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
1152 }
1153 }
1154 }
1155 }
1156 }
1157
1158 private boolean ensureThreadStarted(int oldState) {
1159 if (oldState == ST_NOT_STARTED || oldState == ST_SUSPENDED) {
1160 try {
1161 doStartThread();
1162 } catch (Throwable cause) {
1163 STATE_UPDATER.set(this, ST_TERMINATED);
1164 terminationFuture.tryFailure(cause);
1165
1166 if (!(cause instanceof Exception)) {
1167
1168 PlatformDependent.throwException(cause);
1169 }
1170 return true;
1171 }
1172 }
1173 return false;
1174 }
1175
1176 private void doStartThread() {
1177 executor.execute(new Runnable() {
1178 @Override
1179 public void run() {
1180 processingLock.lock();
1181 assert thread == null;
1182 thread = Thread.currentThread();
1183 if (interrupted) {
1184 thread.interrupt();
1185 interrupted = false;
1186 }
1187 boolean success = false;
1188 Throwable unexpectedException = null;
1189 updateLastExecutionTime();
1190 boolean suspend = false;
1191 try {
1192 for (;;) {
1193 SingleThreadEventExecutor.this.run();
1194 success = true;
1195
1196 int currentState = state;
1197 if (canSuspend(currentState)) {
1198 if (!STATE_UPDATER.compareAndSet(SingleThreadEventExecutor.this,
1199 ST_SUSPENDING, ST_SUSPENDED)) {
1200
1201 continue;
1202 }
1203
1204 if (!canSuspend(ST_SUSPENDED) && STATE_UPDATER.compareAndSet(SingleThreadEventExecutor.this,
1205 ST_SUSPENDED, ST_STARTED)) {
1206
1207
1208 continue;
1209 }
1210 suspend = true;
1211 }
1212 break;
1213 }
1214 } catch (Throwable t) {
1215 unexpectedException = t;
1216 logger.warn("Unexpected exception from an event executor: ", t);
1217 } finally {
1218 boolean shutdown = !suspend;
1219 if (shutdown) {
1220 for (;;) {
1221
1222 int oldState = state;
1223 if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
1224 SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
1225 break;
1226 }
1227 }
1228 if (success && gracefulShutdownStartTime == 0) {
1229
1230 if (logger.isErrorEnabled()) {
1231 logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
1232 SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must " +
1233 "be called before run() implementation terminates.");
1234 }
1235 }
1236 }
1237
1238 try {
1239 if (shutdown) {
1240
1241
1242
1243 for (;;) {
1244 if (confirmShutdown()) {
1245 break;
1246 }
1247 }
1248
1249
1250
1251 for (;;) {
1252 int currentState = state;
1253 if (currentState >= ST_SHUTDOWN || STATE_UPDATER.compareAndSet(
1254 SingleThreadEventExecutor.this, currentState, ST_SHUTDOWN)) {
1255 break;
1256 }
1257 }
1258
1259
1260
1261 confirmShutdown();
1262 }
1263 } finally {
1264 try {
1265 if (shutdown) {
1266 try {
1267 cleanup();
1268 } finally {
1269
1270
1271
1272
1273 FastThreadLocal.removeAll();
1274
1275 STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
1276 threadLock.countDown();
1277 int numUserTasks = drainTasks();
1278 if (numUserTasks > 0 && logger.isWarnEnabled()) {
1279 logger.warn("An event executor terminated with " +
1280 "non-empty task queue (" + numUserTasks + ')');
1281 }
1282 if (unexpectedException == null) {
1283 terminationFuture.setSuccess(null);
1284 } else {
1285 terminationFuture.setFailure(unexpectedException);
1286 }
1287 }
1288 } else {
1289
1290 FastThreadLocal.removeAll();
1291
1292
1293 threadProperties = null;
1294 }
1295 } finally {
1296 thread = null;
1297
1298 processingLock.unlock();
1299 }
1300 }
1301 }
1302 }
1303 });
1304 }
1305
1306 final int drainTasks() {
1307 int numTasks = 0;
1308 for (;;) {
1309 Runnable runnable = taskQueue.poll();
1310 if (runnable == null) {
1311 break;
1312 }
1313
1314
1315 if (WAKEUP_TASK != runnable) {
1316 numTasks++;
1317 }
1318 }
1319 return numTasks;
1320 }
1321
1322 private static final class DefaultThreadProperties implements ThreadProperties {
1323 private final Thread t;
1324
1325 DefaultThreadProperties(Thread t) {
1326 this.t = t;
1327 }
1328
1329 @Override
1330 public State state() {
1331 return t.getState();
1332 }
1333
1334 @Override
1335 public int priority() {
1336 return t.getPriority();
1337 }
1338
1339 @Override
1340 public boolean isInterrupted() {
1341 return t.isInterrupted();
1342 }
1343
1344 @Override
1345 public boolean isDaemon() {
1346 return t.isDaemon();
1347 }
1348
1349 @Override
1350 public String name() {
1351 return t.getName();
1352 }
1353
1354 @Override
1355 public long id() {
1356 return t.getId();
1357 }
1358
1359 @Override
1360 public StackTraceElement[] stackTrace() {
1361 return t.getStackTrace();
1362 }
1363
1364 @Override
1365 public boolean isAlive() {
1366 return t.isAlive();
1367 }
1368 }
1369 }