1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.util.concurrent;
17
18 import io.netty.util.internal.ObjectUtil;
19 import io.netty.util.internal.PlatformDependent;
20 import io.netty.util.internal.SystemPropertyUtil;
21 import io.netty.util.internal.ThreadExecutorMap;
22 import io.netty.util.internal.logging.InternalLogger;
23 import io.netty.util.internal.logging.InternalLoggerFactory;
24 import org.jetbrains.annotations.Async.Schedule;
25
26 import java.lang.Thread.State;
27 import java.util.ArrayList;
28 import java.util.Collection;
29 import java.util.LinkedHashSet;
30 import java.util.List;
31 import java.util.Queue;
32 import java.util.Set;
33 import java.util.concurrent.BlockingQueue;
34 import java.util.concurrent.Callable;
35 import java.util.concurrent.CountDownLatch;
36 import java.util.concurrent.ExecutionException;
37 import java.util.concurrent.Executor;
38 import java.util.concurrent.LinkedBlockingQueue;
39 import java.util.concurrent.RejectedExecutionException;
40 import java.util.concurrent.ThreadFactory;
41 import java.util.concurrent.TimeUnit;
42 import java.util.concurrent.TimeoutException;
43 import java.util.concurrent.atomic.AtomicInteger;
44 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
45 import java.util.concurrent.atomic.AtomicLong;
46 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
47 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
48 import java.util.concurrent.locks.Lock;
49 import java.util.concurrent.locks.ReentrantLock;
50
51
52
53
54
55 public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
56
57 static final int DEFAULT_MAX_PENDING_EXECUTOR_TASKS = Math.max(16,
58 SystemPropertyUtil.getInt("io.netty.eventexecutor.maxPendingTasks", Integer.MAX_VALUE));
59
60 private static final InternalLogger logger =
61 InternalLoggerFactory.getInstance(SingleThreadEventExecutor.class);
62
63 private static final int ST_NOT_STARTED = 1;
64 private static final int ST_SUSPENDING = 2;
65 private static final int ST_SUSPENDED = 3;
66 private static final int ST_STARTED = 4;
67 private static final int ST_SHUTTING_DOWN = 5;
68 private static final int ST_SHUTDOWN = 6;
69 private static final int ST_TERMINATED = 7;
70
71 private static final Runnable NOOP_TASK = new Runnable() {
72 @Override
73 public void run() {
74
75 }
76 };
77
78 private static final AtomicIntegerFieldUpdater<SingleThreadEventExecutor> STATE_UPDATER =
79 AtomicIntegerFieldUpdater.newUpdater(SingleThreadEventExecutor.class, "state");
80 private static final AtomicReferenceFieldUpdater<SingleThreadEventExecutor, ThreadProperties> PROPERTIES_UPDATER =
81 AtomicReferenceFieldUpdater.newUpdater(
82 SingleThreadEventExecutor.class, ThreadProperties.class, "threadProperties");
83 private static final AtomicLongFieldUpdater<SingleThreadEventExecutor> ACCUMULATED_ACTIVE_TIME_NANOS_UPDATER =
84 AtomicLongFieldUpdater.newUpdater(SingleThreadEventExecutor.class, "accumulatedActiveTimeNanos");
85 private static final AtomicIntegerFieldUpdater<SingleThreadEventExecutor> CONSECUTIVE_IDLE_CYCLES_UPDATER =
86 AtomicIntegerFieldUpdater.newUpdater(SingleThreadEventExecutor.class, "consecutiveIdleCycles");
87 private static final AtomicIntegerFieldUpdater<SingleThreadEventExecutor> CONSECUTIVE_BUSY_CYCLES_UPDATER =
88 AtomicIntegerFieldUpdater.newUpdater(SingleThreadEventExecutor.class, "consecutiveBusyCycles");
89 private final Queue<Runnable> taskQueue;
90
91 private volatile Thread thread;
92 @SuppressWarnings("unused")
93 private volatile ThreadProperties threadProperties;
94 private final Executor executor;
95 private volatile boolean interrupted;
96
97 private final Lock processingLock = new ReentrantLock();
98 private final CountDownLatch threadLock = new CountDownLatch(1);
99 private final Set<Runnable> shutdownHooks = new LinkedHashSet<Runnable>();
100 private final boolean addTaskWakesUp;
101 private final int maxPendingTasks;
102 private final RejectedExecutionHandler rejectedExecutionHandler;
103 private final boolean supportSuspension;
104
105
106 private volatile long accumulatedActiveTimeNanos;
107
108 private volatile long lastActivityTimeNanos;
109
110
111
112
113 private volatile int consecutiveIdleCycles;
114
115
116
117
118
119 private volatile int consecutiveBusyCycles;
120 private long lastExecutionTime;
121
122 @SuppressWarnings({ "FieldMayBeFinal", "unused" })
123 private volatile int state = ST_NOT_STARTED;
124
125 private volatile long gracefulShutdownQuietPeriod;
126 private volatile long gracefulShutdownTimeout;
127 private long gracefulShutdownStartTime;
128
129 private final Promise<?> terminationFuture = new DefaultPromise<Void>(GlobalEventExecutor.INSTANCE);
130
131
132
133
134
135
136
137
138
139 protected SingleThreadEventExecutor(
140 EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
141 this(parent, new ThreadPerTaskExecutor(threadFactory), addTaskWakesUp);
142 }
143
144
145
146
147
148
149
150
151
152
153
154 protected SingleThreadEventExecutor(
155 EventExecutorGroup parent, ThreadFactory threadFactory,
156 boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedHandler) {
157 this(parent, new ThreadPerTaskExecutor(threadFactory), addTaskWakesUp, maxPendingTasks, rejectedHandler);
158 }
159
160
161
162
163
164
165
166
167
168
169
170
171 protected SingleThreadEventExecutor(
172 EventExecutorGroup parent, ThreadFactory threadFactory,
173 boolean addTaskWakesUp, boolean supportSuspension,
174 int maxPendingTasks, RejectedExecutionHandler rejectedHandler) {
175 this(parent, new ThreadPerTaskExecutor(threadFactory), addTaskWakesUp, supportSuspension,
176 maxPendingTasks, rejectedHandler);
177 }
178
179
180
181
182
183
184
185
186
187 protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp) {
188 this(parent, executor, addTaskWakesUp, DEFAULT_MAX_PENDING_EXECUTOR_TASKS, RejectedExecutionHandlers.reject());
189 }
190
191
192
193
194
195
196
197
198
199
200
201 protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
202 boolean addTaskWakesUp, int maxPendingTasks,
203 RejectedExecutionHandler rejectedHandler) {
204 this(parent, executor, addTaskWakesUp, false, maxPendingTasks, rejectedHandler);
205 }
206
207
208
209
210
211
212
213
214
215
216
217
218 protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
219 boolean addTaskWakesUp, boolean supportSuspension,
220 int maxPendingTasks, RejectedExecutionHandler rejectedHandler) {
221 super(parent);
222 this.addTaskWakesUp = addTaskWakesUp;
223 this.supportSuspension = supportSuspension;
224 this.maxPendingTasks = Math.max(16, maxPendingTasks);
225 this.executor = ThreadExecutorMap.apply(executor, this);
226 taskQueue = newTaskQueue(this.maxPendingTasks);
227 rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
228 lastActivityTimeNanos = ticker().nanoTime();
229 }
230
231 protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
232 boolean addTaskWakesUp, Queue<Runnable> taskQueue,
233 RejectedExecutionHandler rejectedHandler) {
234 this(parent, executor, addTaskWakesUp, false, taskQueue, rejectedHandler);
235 }
236
237 protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
238 boolean addTaskWakesUp, boolean supportSuspension,
239 Queue<Runnable> taskQueue, RejectedExecutionHandler rejectedHandler) {
240 super(parent);
241 this.addTaskWakesUp = addTaskWakesUp;
242 this.supportSuspension = supportSuspension;
243 this.maxPendingTasks = DEFAULT_MAX_PENDING_EXECUTOR_TASKS;
244 this.executor = ThreadExecutorMap.apply(executor, this);
245 this.taskQueue = ObjectUtil.checkNotNull(taskQueue, "taskQueue");
246 this.rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
247 }
248
249
250
251
252 @Deprecated
253 protected Queue<Runnable> newTaskQueue() {
254 return newTaskQueue(maxPendingTasks);
255 }
256
257
258
259
260
261
262
263 protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
264 return new LinkedBlockingQueue<Runnable>(maxPendingTasks);
265 }
266
267
268
269
270 protected void interruptThread() {
271 Thread currentThread = thread;
272 if (currentThread == null) {
273 interrupted = true;
274 } else {
275 currentThread.interrupt();
276 }
277 }
278
279
280
281
282 protected Runnable pollTask() {
283 assert inEventLoop();
284 return pollTaskFrom(taskQueue);
285 }
286
287 protected static Runnable pollTaskFrom(Queue<Runnable> taskQueue) {
288 for (;;) {
289 Runnable task = taskQueue.poll();
290 if (task != WAKEUP_TASK) {
291 return task;
292 }
293 }
294 }
295
296
297
298
299
300
301
302
303
304
305 protected Runnable takeTask() {
306 assert inEventLoop();
307 if (!(taskQueue instanceof BlockingQueue)) {
308 throw new UnsupportedOperationException();
309 }
310
311 BlockingQueue<Runnable> taskQueue = (BlockingQueue<Runnable>) this.taskQueue;
312 for (;;) {
313 ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
314 if (scheduledTask == null) {
315 Runnable task = null;
316 try {
317 task = taskQueue.take();
318 if (task == WAKEUP_TASK) {
319 task = null;
320 }
321 } catch (InterruptedException e) {
322
323 }
324 return task;
325 } else {
326 long delayNanos = scheduledTask.delayNanos();
327 Runnable task = null;
328 if (delayNanos > 0) {
329 try {
330 task = taskQueue.poll(delayNanos, TimeUnit.NANOSECONDS);
331 } catch (InterruptedException e) {
332
333 return null;
334 }
335 }
336 if (task == null) {
337
338
339
340
341 fetchFromScheduledTaskQueue();
342 task = taskQueue.poll();
343 }
344
345 if (task != null) {
346 if (task == WAKEUP_TASK) {
347 return null;
348 }
349 return task;
350 }
351 }
352 }
353 }
354
355 private boolean fetchFromScheduledTaskQueue() {
356 return fetchFromScheduledTaskQueue(taskQueue);
357 }
358
359
360
361
362 private boolean executeExpiredScheduledTasks() {
363 if (scheduledTaskQueue == null || scheduledTaskQueue.isEmpty()) {
364 return false;
365 }
366 long nanoTime = getCurrentTimeNanos();
367 Runnable scheduledTask = pollScheduledTask(nanoTime);
368 if (scheduledTask == null) {
369 return false;
370 }
371 do {
372 safeExecute(scheduledTask);
373 } while ((scheduledTask = pollScheduledTask(nanoTime)) != null);
374 return true;
375 }
376
377
378
379
380 protected Runnable peekTask() {
381 assert inEventLoop();
382 return taskQueue.peek();
383 }
384
385
386
387
388 protected boolean hasTasks() {
389 assert inEventLoop();
390 return !taskQueue.isEmpty();
391 }
392
393
394
395
396 public int pendingTasks() {
397 return taskQueue.size();
398 }
399
400
401
402
403
404 protected void addTask(Runnable task) {
405 ObjectUtil.checkNotNull(task, "task");
406 if (!offerTask(task)) {
407 reject(task);
408 }
409 }
410
411 final boolean offerTask(Runnable task) {
412 if (isShutdown()) {
413 reject();
414 }
415 return taskQueue.offer(task);
416 }
417
418
419
420
421 protected boolean removeTask(Runnable task) {
422 return taskQueue.remove(ObjectUtil.checkNotNull(task, "task"));
423 }
424
425
426
427
428
429
430 protected boolean runAllTasks() {
431 assert inEventLoop();
432 boolean fetchedAll;
433 boolean ranAtLeastOne = false;
434
435 do {
436 fetchedAll = fetchFromScheduledTaskQueue(taskQueue);
437 if (runAllTasksFrom(taskQueue)) {
438 ranAtLeastOne = true;
439 }
440 } while (!fetchedAll);
441
442 if (ranAtLeastOne) {
443 lastExecutionTime = getCurrentTimeNanos();
444 }
445 afterRunningAllTasks();
446 return ranAtLeastOne;
447 }
448
449
450
451
452
453
454
455
456
457 protected final boolean runScheduledAndExecutorTasks(final int maxDrainAttempts) {
458 assert inEventLoop();
459 boolean ranAtLeastOneTask;
460 int drainAttempt = 0;
461 do {
462
463
464 ranAtLeastOneTask = runExistingTasksFrom(taskQueue) | executeExpiredScheduledTasks();
465 } while (ranAtLeastOneTask && ++drainAttempt < maxDrainAttempts);
466
467 if (drainAttempt > 0) {
468 lastExecutionTime = getCurrentTimeNanos();
469 }
470 afterRunningAllTasks();
471
472 return drainAttempt > 0;
473 }
474
475
476
477
478
479
480
481
482 protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) {
483 Runnable task = pollTaskFrom(taskQueue);
484 if (task == null) {
485 return false;
486 }
487 for (;;) {
488 safeExecute(task);
489 task = pollTaskFrom(taskQueue);
490 if (task == null) {
491 return true;
492 }
493 }
494 }
495
496
497
498
499
500
501 private boolean runExistingTasksFrom(Queue<Runnable> taskQueue) {
502 Runnable task = pollTaskFrom(taskQueue);
503 if (task == null) {
504 return false;
505 }
506 int remaining = Math.min(maxPendingTasks, taskQueue.size());
507 safeExecute(task);
508
509
510 while (remaining-- > 0 && (task = taskQueue.poll()) != null) {
511 safeExecute(task);
512 }
513 return true;
514 }
515
516
517
518
519
520 @SuppressWarnings("NonAtomicOperationOnVolatileField")
521 protected boolean runAllTasks(long timeoutNanos) {
522 fetchFromScheduledTaskQueue(taskQueue);
523 Runnable task = pollTask();
524 if (task == null) {
525 afterRunningAllTasks();
526 return false;
527 }
528
529 final long deadline = timeoutNanos > 0 ? getCurrentTimeNanos() + timeoutNanos : 0;
530 long runTasks = 0;
531 long lastExecutionTime;
532
533 long workStartTime = ticker().nanoTime();
534 for (;;) {
535 safeExecute(task);
536
537 runTasks ++;
538
539
540
541 if ((runTasks & 0x3F) == 0) {
542 lastExecutionTime = getCurrentTimeNanos();
543 if (lastExecutionTime >= deadline) {
544 break;
545 }
546 }
547
548 task = pollTask();
549 if (task == null) {
550 lastExecutionTime = getCurrentTimeNanos();
551 break;
552 }
553 }
554
555 long workEndTime = ticker().nanoTime();
556 accumulatedActiveTimeNanos += workEndTime - workStartTime;
557 lastActivityTimeNanos = workEndTime;
558
559 afterRunningAllTasks();
560 this.lastExecutionTime = lastExecutionTime;
561 return true;
562 }
563
564
565
566
567 protected void afterRunningAllTasks() { }
568
569
570
571
572 protected long delayNanos(long currentTimeNanos) {
573 currentTimeNanos -= ticker().initialNanoTime();
574
575 ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
576 if (scheduledTask == null) {
577 return SCHEDULE_PURGE_INTERVAL;
578 }
579
580 return scheduledTask.delayNanos(currentTimeNanos);
581 }
582
583
584
585
586
587 protected long deadlineNanos() {
588 ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
589 if (scheduledTask == null) {
590 return getCurrentTimeNanos() + SCHEDULE_PURGE_INTERVAL;
591 }
592 return scheduledTask.deadlineNanos();
593 }
594
595
596
597
598
599
600
601
602 protected void updateLastExecutionTime() {
603 long now = getCurrentTimeNanos();
604 lastExecutionTime = now;
605 lastActivityTimeNanos = now;
606 }
607
608
609
610
611
612
613
614 protected int getNumOfRegisteredChannels() {
615 return -1;
616 }
617
618
619
620
621
622
623
624
625
626 @SuppressWarnings("NonAtomicOperationOnVolatileField")
627 protected void reportActiveIoTime(long nanos) {
628 assert inEventLoop();
629 if (nanos > 0) {
630 accumulatedActiveTimeNanos += nanos;
631 lastActivityTimeNanos = ticker().nanoTime();
632 }
633 }
634
635
636
637
638 protected long getAndResetAccumulatedActiveTimeNanos() {
639 return ACCUMULATED_ACTIVE_TIME_NANOS_UPDATER.getAndSet(this, 0);
640 }
641
642
643
644
645 protected long getLastActivityTimeNanos() {
646 return lastActivityTimeNanos;
647 }
648
649
650
651
652
653
654
655 protected int getAndIncrementIdleCycles() {
656 return CONSECUTIVE_IDLE_CYCLES_UPDATER.getAndIncrement(this);
657 }
658
659
660
661
662
663 protected void resetIdleCycles() {
664 CONSECUTIVE_IDLE_CYCLES_UPDATER.set(this, 0);
665 }
666
667
668
669
670
671
672
673 protected int getAndIncrementBusyCycles() {
674 return CONSECUTIVE_BUSY_CYCLES_UPDATER.getAndIncrement(this);
675 }
676
677
678
679
680
681 protected void resetBusyCycles() {
682 CONSECUTIVE_BUSY_CYCLES_UPDATER.set(this, 0);
683 }
684
685
686
687
688 protected boolean isSuspensionSupported() {
689 return supportSuspension;
690 }
691
692
693
694
695 protected abstract void run();
696
697
698
699
700 protected void cleanup() {
701
702 }
703
704 protected void wakeup(boolean inEventLoop) {
705 if (!inEventLoop) {
706
707
708 taskQueue.offer(WAKEUP_TASK);
709 }
710 }
711
712 @Override
713 public boolean inEventLoop(Thread thread) {
714 return thread == this.thread;
715 }
716
717
718
719
720 public void addShutdownHook(final Runnable task) {
721 if (inEventLoop()) {
722 shutdownHooks.add(task);
723 } else {
724 execute(new Runnable() {
725 @Override
726 public void run() {
727 shutdownHooks.add(task);
728 }
729 });
730 }
731 }
732
733
734
735
736 public void removeShutdownHook(final Runnable task) {
737 if (inEventLoop()) {
738 shutdownHooks.remove(task);
739 } else {
740 execute(new Runnable() {
741 @Override
742 public void run() {
743 shutdownHooks.remove(task);
744 }
745 });
746 }
747 }
748
749 private boolean runShutdownHooks() {
750 boolean ran = false;
751
752 while (!shutdownHooks.isEmpty()) {
753 List<Runnable> copy = new ArrayList<Runnable>(shutdownHooks);
754 shutdownHooks.clear();
755 for (Runnable task: copy) {
756 try {
757 runTask(task);
758 } catch (Throwable t) {
759 logger.warn("Shutdown hook raised an exception.", t);
760 } finally {
761 ran = true;
762 }
763 }
764 }
765
766 if (ran) {
767 lastExecutionTime = getCurrentTimeNanos();
768 }
769
770 return ran;
771 }
772
773 private void shutdown0(long quietPeriod, long timeout, int shutdownState) {
774 if (isShuttingDown()) {
775 return;
776 }
777
778 boolean inEventLoop = inEventLoop();
779 boolean wakeup;
780 int oldState;
781 for (;;) {
782 if (isShuttingDown()) {
783 return;
784 }
785 int newState;
786 wakeup = true;
787 oldState = state;
788 if (inEventLoop) {
789 newState = shutdownState;
790 } else {
791 switch (oldState) {
792 case ST_NOT_STARTED:
793 case ST_STARTED:
794 case ST_SUSPENDING:
795 case ST_SUSPENDED:
796 newState = shutdownState;
797 break;
798 default:
799 newState = oldState;
800 wakeup = false;
801 }
802 }
803 if (STATE_UPDATER.compareAndSet(this, oldState, newState)) {
804 break;
805 }
806 }
807 if (quietPeriod != -1) {
808 gracefulShutdownQuietPeriod = quietPeriod;
809 }
810 if (timeout != -1) {
811 gracefulShutdownTimeout = timeout;
812 }
813
814 if (ensureThreadStarted(oldState)) {
815 return;
816 }
817
818 if (wakeup) {
819 taskQueue.offer(WAKEUP_TASK);
820 if (!addTaskWakesUp) {
821 wakeup(inEventLoop);
822 }
823 }
824 }
825
826 @Override
827 public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
828 ObjectUtil.checkPositiveOrZero(quietPeriod, "quietPeriod");
829 if (timeout < quietPeriod) {
830 throw new IllegalArgumentException(
831 "timeout: " + timeout + " (expected >= quietPeriod (" + quietPeriod + "))");
832 }
833 ObjectUtil.checkNotNull(unit, "unit");
834
835 shutdown0(unit.toNanos(quietPeriod), unit.toNanos(timeout), ST_SHUTTING_DOWN);
836 return terminationFuture();
837 }
838
839 @Override
840 public Future<?> terminationFuture() {
841 return terminationFuture;
842 }
843
844 @Override
845 @Deprecated
846 public void shutdown() {
847 shutdown0(-1, -1, ST_SHUTDOWN);
848 }
849
850 @Override
851 public boolean isShuttingDown() {
852 return state >= ST_SHUTTING_DOWN;
853 }
854
855 @Override
856 public boolean isShutdown() {
857 return state >= ST_SHUTDOWN;
858 }
859
860 @Override
861 public boolean isTerminated() {
862 return state == ST_TERMINATED;
863 }
864
865 @Override
866 public boolean isSuspended() {
867 int currentState = state;
868 return currentState == ST_SUSPENDED || currentState == ST_SUSPENDING;
869 }
870
871 @Override
872 public boolean trySuspend() {
873 if (supportSuspension) {
874 if (STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_SUSPENDING)) {
875 wakeup(inEventLoop());
876 return true;
877 } else if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_SUSPENDED)) {
878 return true;
879 }
880 int currentState = state;
881 return currentState == ST_SUSPENDED || currentState == ST_SUSPENDING;
882 }
883 return false;
884 }
885
886
887
888
889
890
891
892 protected boolean canSuspend() {
893 return canSuspend(state);
894 }
895
896
897
898
899
900
901
902
903
904
905 protected boolean canSuspend(int state) {
906 assert inEventLoop();
907 return supportSuspension && (state == ST_SUSPENDED || state == ST_SUSPENDING)
908 && !hasTasks() && nextScheduledTaskDeadlineNanos() == -1;
909 }
910
911
912
913
914 protected boolean confirmShutdown() {
915 if (!isShuttingDown()) {
916 return false;
917 }
918
919 if (!inEventLoop()) {
920 throw new IllegalStateException("must be invoked from an event loop");
921 }
922
923 cancelScheduledTasks();
924
925 if (gracefulShutdownStartTime == 0) {
926 gracefulShutdownStartTime = getCurrentTimeNanos();
927 }
928
929 if (runAllTasks() || runShutdownHooks()) {
930 if (isShutdown()) {
931
932 return true;
933 }
934
935
936
937
938 if (gracefulShutdownQuietPeriod == 0) {
939 return true;
940 }
941 taskQueue.offer(WAKEUP_TASK);
942 return false;
943 }
944
945 final long nanoTime = getCurrentTimeNanos();
946
947 if (isShutdown() || nanoTime - gracefulShutdownStartTime > gracefulShutdownTimeout) {
948 return true;
949 }
950
951 if (nanoTime - lastExecutionTime <= gracefulShutdownQuietPeriod) {
952
953
954 taskQueue.offer(WAKEUP_TASK);
955 try {
956 Thread.sleep(100);
957 } catch (InterruptedException e) {
958
959 }
960
961 return false;
962 }
963
964
965
966 return true;
967 }
968
969 @Override
970 public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
971 ObjectUtil.checkNotNull(unit, "unit");
972 if (inEventLoop()) {
973 throw new IllegalStateException("cannot await termination of the current thread");
974 }
975
976 threadLock.await(timeout, unit);
977
978 return isTerminated();
979 }
980
981 @Override
982 public void execute(Runnable task) {
983 execute0(task);
984 }
985
986 @Override
987 public void lazyExecute(Runnable task) {
988 lazyExecute0(task);
989 }
990
991 private void execute0(@Schedule Runnable task) {
992 ObjectUtil.checkNotNull(task, "task");
993 execute(task, wakesUpForTask(task));
994 }
995
996 private void lazyExecute0(@Schedule Runnable task) {
997 execute(ObjectUtil.checkNotNull(task, "task"), false);
998 }
999
1000 @Override
1001 void scheduleRemoveScheduled(final ScheduledFutureTask<?> task) {
1002 ObjectUtil.checkNotNull(task, "task");
1003 int currentState = state;
1004 if (supportSuspension && currentState == ST_SUSPENDED) {
1005
1006
1007
1008 execute(new Runnable() {
1009 @Override
1010 public void run() {
1011 task.run();
1012 if (canSuspend(ST_SUSPENDED)) {
1013
1014
1015 trySuspend();
1016 }
1017 }
1018 }, true);
1019 } else {
1020
1021 execute(task, false);
1022 }
1023 }
1024
1025 private void execute(Runnable task, boolean immediate) {
1026 boolean inEventLoop = inEventLoop();
1027 addTask(task);
1028 if (!inEventLoop) {
1029 startThread();
1030 if (isShutdown()) {
1031 boolean reject = false;
1032 try {
1033 if (removeTask(task)) {
1034 reject = true;
1035 }
1036 } catch (UnsupportedOperationException e) {
1037
1038
1039
1040 }
1041 if (reject) {
1042 reject();
1043 }
1044 }
1045 }
1046
1047 if (!addTaskWakesUp && immediate) {
1048 wakeup(inEventLoop);
1049 }
1050 }
1051
1052 @Override
1053 public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
1054 throwIfInEventLoop("invokeAny");
1055 return super.invokeAny(tasks);
1056 }
1057
1058 @Override
1059 public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
1060 throws InterruptedException, ExecutionException, TimeoutException {
1061 throwIfInEventLoop("invokeAny");
1062 return super.invokeAny(tasks, timeout, unit);
1063 }
1064
1065 @Override
1066 public <T> List<java.util.concurrent.Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
1067 throws InterruptedException {
1068 throwIfInEventLoop("invokeAll");
1069 return super.invokeAll(tasks);
1070 }
1071
1072 @Override
1073 public <T> List<java.util.concurrent.Future<T>> invokeAll(
1074 Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
1075 throwIfInEventLoop("invokeAll");
1076 return super.invokeAll(tasks, timeout, unit);
1077 }
1078
1079 private void throwIfInEventLoop(String method) {
1080 if (inEventLoop()) {
1081 throw new RejectedExecutionException("Calling " + method + " from within the EventLoop is not allowed");
1082 }
1083 }
1084
1085
1086
1087
1088
1089
1090 public final ThreadProperties threadProperties() {
1091 ThreadProperties threadProperties = this.threadProperties;
1092 if (threadProperties == null) {
1093 Thread thread = this.thread;
1094 if (thread == null) {
1095 assert !inEventLoop();
1096 submit(NOOP_TASK).syncUninterruptibly();
1097 thread = this.thread;
1098 assert thread != null;
1099 }
1100
1101 threadProperties = new DefaultThreadProperties(thread);
1102 if (!PROPERTIES_UPDATER.compareAndSet(this, null, threadProperties)) {
1103 threadProperties = this.threadProperties;
1104 }
1105 }
1106
1107 return threadProperties;
1108 }
1109
1110
1111
1112
1113 @Deprecated
1114 protected interface NonWakeupRunnable extends LazyRunnable { }
1115
1116
1117
1118
1119
1120 protected boolean wakesUpForTask(Runnable task) {
1121 return true;
1122 }
1123
1124 protected static void reject() {
1125 throw new RejectedExecutionException("event executor terminated");
1126 }
1127
1128
1129
1130
1131
1132
1133 protected final void reject(Runnable task) {
1134 rejectedExecutionHandler.rejected(task, this);
1135 }
1136
1137
1138
1139 private static final long SCHEDULE_PURGE_INTERVAL = TimeUnit.SECONDS.toNanos(1);
1140
1141 private void startThread() {
1142 int currentState = state;
1143 if (currentState == ST_NOT_STARTED || currentState == ST_SUSPENDED) {
1144 if (STATE_UPDATER.compareAndSet(this, currentState, ST_STARTED)) {
1145 resetIdleCycles();
1146 resetBusyCycles();
1147 boolean success = false;
1148 try {
1149 doStartThread();
1150 success = true;
1151 } finally {
1152 if (!success) {
1153 STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
1154 }
1155 }
1156 }
1157 }
1158 }
1159
1160 private boolean ensureThreadStarted(int oldState) {
1161 if (oldState == ST_NOT_STARTED || oldState == ST_SUSPENDED) {
1162 try {
1163 doStartThread();
1164 } catch (Throwable cause) {
1165 STATE_UPDATER.set(this, ST_TERMINATED);
1166 terminationFuture.tryFailure(cause);
1167
1168 if (!(cause instanceof Exception)) {
1169
1170 PlatformDependent.throwException(cause);
1171 }
1172 return true;
1173 }
1174 }
1175 return false;
1176 }
1177
1178 private void doStartThread() {
1179 executor.execute(new Runnable() {
1180 @Override
1181 public void run() {
1182 processingLock.lock();
1183 assert thread == null;
1184 thread = Thread.currentThread();
1185 if (interrupted) {
1186 thread.interrupt();
1187 interrupted = false;
1188 }
1189 boolean success = false;
1190 Throwable unexpectedException = null;
1191 updateLastExecutionTime();
1192 boolean suspend = false;
1193 try {
1194 for (;;) {
1195 SingleThreadEventExecutor.this.run();
1196 success = true;
1197
1198 int currentState = state;
1199 if (canSuspend(currentState)) {
1200 if (!STATE_UPDATER.compareAndSet(SingleThreadEventExecutor.this,
1201 ST_SUSPENDING, ST_SUSPENDED)) {
1202
1203 continue;
1204 }
1205
1206 if (!canSuspend(ST_SUSPENDED) && STATE_UPDATER.compareAndSet(SingleThreadEventExecutor.this,
1207 ST_SUSPENDED, ST_STARTED)) {
1208
1209
1210 continue;
1211 }
1212 suspend = true;
1213 }
1214 break;
1215 }
1216 } catch (Throwable t) {
1217 unexpectedException = t;
1218 logger.warn("Unexpected exception from an event executor: ", t);
1219 } finally {
1220 boolean shutdown = !suspend;
1221 if (shutdown) {
1222 for (;;) {
1223
1224 int oldState = state;
1225 if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
1226 SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
1227 break;
1228 }
1229 }
1230 if (success && gracefulShutdownStartTime == 0) {
1231
1232 if (logger.isErrorEnabled()) {
1233 logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
1234 SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must " +
1235 "be called before run() implementation terminates.");
1236 }
1237 }
1238 }
1239
1240 try {
1241 if (shutdown) {
1242
1243
1244
1245 for (;;) {
1246 if (confirmShutdown()) {
1247 break;
1248 }
1249 }
1250
1251
1252
1253 for (;;) {
1254 int currentState = state;
1255 if (currentState >= ST_SHUTDOWN || STATE_UPDATER.compareAndSet(
1256 SingleThreadEventExecutor.this, currentState, ST_SHUTDOWN)) {
1257 break;
1258 }
1259 }
1260
1261
1262
1263 confirmShutdown();
1264 }
1265 } finally {
1266 try {
1267 if (shutdown) {
1268 try {
1269 cleanup();
1270 } finally {
1271
1272
1273
1274
1275 FastThreadLocal.removeAll();
1276
1277 STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
1278 threadLock.countDown();
1279 int numUserTasks = drainTasks();
1280 if (numUserTasks > 0 && logger.isWarnEnabled()) {
1281 logger.warn("An event executor terminated with " +
1282 "non-empty task queue (" + numUserTasks + ')');
1283 }
1284 if (unexpectedException == null) {
1285 terminationFuture.setSuccess(null);
1286 } else {
1287 terminationFuture.setFailure(unexpectedException);
1288 }
1289 }
1290 } else {
1291
1292 FastThreadLocal.removeAll();
1293
1294
1295 threadProperties = null;
1296 }
1297 } finally {
1298 thread = null;
1299
1300 processingLock.unlock();
1301 }
1302 }
1303 }
1304 }
1305 });
1306 }
1307
1308 final int drainTasks() {
1309 int numTasks = 0;
1310 for (;;) {
1311 Runnable runnable = taskQueue.poll();
1312 if (runnable == null) {
1313 break;
1314 }
1315
1316
1317 if (WAKEUP_TASK != runnable) {
1318 numTasks++;
1319 }
1320 }
1321 return numTasks;
1322 }
1323
1324 private static final class DefaultThreadProperties implements ThreadProperties {
1325 private final Thread t;
1326
1327 DefaultThreadProperties(Thread t) {
1328 this.t = t;
1329 }
1330
1331 @Override
1332 public State state() {
1333 return t.getState();
1334 }
1335
1336 @Override
1337 public int priority() {
1338 return t.getPriority();
1339 }
1340
1341 @Override
1342 public boolean isInterrupted() {
1343 return t.isInterrupted();
1344 }
1345
1346 @Override
1347 public boolean isDaemon() {
1348 return t.isDaemon();
1349 }
1350
1351 @Override
1352 public String name() {
1353 return t.getName();
1354 }
1355
1356 @Override
1357 public long id() {
1358 return t.getId();
1359 }
1360
1361 @Override
1362 public StackTraceElement[] stackTrace() {
1363 return t.getStackTrace();
1364 }
1365
1366 @Override
1367 public boolean isAlive() {
1368 return t.isAlive();
1369 }
1370 }
1371 }