1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty5.util.concurrent;
17
18 import io.netty5.util.internal.SystemPropertyUtil;
19 import io.netty5.util.internal.ThreadExecutorMap;
20 import io.netty5.util.internal.logging.InternalLogger;
21 import io.netty5.util.internal.logging.InternalLoggerFactory;
22 import org.jetbrains.annotations.Async.Execute;
23 import org.jetbrains.annotations.Async.Schedule;
24
25 import java.lang.Thread.State;
26 import java.util.ArrayList;
27 import java.util.LinkedHashSet;
28 import java.util.List;
29 import java.util.Queue;
30 import java.util.Set;
31 import java.util.concurrent.BlockingQueue;
32 import java.util.concurrent.CountDownLatch;
33 import java.util.concurrent.Executor;
34 import java.util.concurrent.LinkedBlockingQueue;
35 import java.util.concurrent.RejectedExecutionException;
36 import java.util.concurrent.ThreadFactory;
37 import java.util.concurrent.TimeUnit;
38 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
39 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
40
41 import static java.util.Objects.requireNonNull;
42
43
44
45
46
47 public class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
48
49 protected static final int DEFAULT_MAX_PENDING_EXECUTOR_TASKS = Math.max(16,
50 SystemPropertyUtil.getInt("io.netty5.eventexecutor.maxPendingTasks", Integer.MAX_VALUE));
51
52 private static final InternalLogger logger =
53 InternalLoggerFactory.getInstance(SingleThreadEventExecutor.class);
54
55 private static final int ST_NOT_STARTED = 1;
56 private static final int ST_STARTED = 2;
57 private static final int ST_SHUTTING_DOWN = 3;
58 private static final int ST_SHUTDOWN = 4;
59 private static final int ST_TERMINATED = 5;
60
61 private static final Runnable WAKEUP_TASK = () -> {
62
63 };
64 private static final Runnable NOOP_TASK = () -> {
65
66 };
67
68 private static final AtomicIntegerFieldUpdater<SingleThreadEventExecutor> STATE_UPDATER =
69 AtomicIntegerFieldUpdater.newUpdater(SingleThreadEventExecutor.class, "state");
70 private static final AtomicReferenceFieldUpdater<SingleThreadEventExecutor, ThreadProperties> PROPERTIES_UPDATER =
71 AtomicReferenceFieldUpdater.newUpdater(
72 SingleThreadEventExecutor.class, ThreadProperties.class, "threadProperties");
73
74 private final Queue<Runnable> taskQueue;
75
76 private volatile Thread thread;
77 @SuppressWarnings("unused")
78 private volatile ThreadProperties threadProperties;
79 private final Executor executor;
80 private volatile boolean interrupted;
81
82 private final CountDownLatch threadLock = new CountDownLatch(1);
83 private final Set<Runnable> shutdownHooks = new LinkedHashSet<>();
84 private final boolean addTaskWakesUp;
85 private final RejectedExecutionHandler rejectedExecutionHandler;
86
87 private long lastExecutionTime;
88
89 @SuppressWarnings({ "FieldMayBeFinal", "unused" })
90 private volatile int state = ST_NOT_STARTED;
91
92 private volatile long gracefulShutdownQuietPeriod;
93 private volatile long gracefulShutdownTimeout;
94 private long gracefulShutdownStartTime;
95
96 private final Promise<Void> terminationFuture = new DefaultPromise<>(GlobalEventExecutor.INSTANCE);
97
98
99
100
101 public SingleThreadEventExecutor() {
102 this(new DefaultThreadFactory(SingleThreadEventExecutor.class));
103 }
104
105
106
107
108
109
110 public SingleThreadEventExecutor(ThreadFactory threadFactory) {
111 this(new ThreadPerTaskExecutor(threadFactory));
112 }
113
114
115
116
117
118
119
120
121 public SingleThreadEventExecutor(ThreadFactory threadFactory,
122 int maxPendingTasks, RejectedExecutionHandler rejectedHandler) {
123 this(new ThreadPerTaskExecutor(threadFactory), maxPendingTasks, rejectedHandler);
124 }
125
126
127
128
129
130
131 public SingleThreadEventExecutor(Executor executor) {
132 this(executor, DEFAULT_MAX_PENDING_EXECUTOR_TASKS, RejectedExecutionHandlers.reject());
133 }
134
135
136
137
138
139
140
141
142 public SingleThreadEventExecutor(Executor executor, int maxPendingTasks, RejectedExecutionHandler rejectedHandler) {
143 this.executor = ThreadExecutorMap.apply(executor, this);
144 taskQueue = newTaskQueue(Math.max(16, maxPendingTasks));
145 addTaskWakesUp = taskQueue instanceof BlockingQueue;
146 rejectedExecutionHandler = requireNonNull(rejectedHandler, "rejectedHandler");
147 }
148
149
150
151
152
153
154
155
156
157
158
159
160
161 protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
162 return new LinkedBlockingQueue<>(maxPendingTasks);
163 }
164
165
166
167
168 protected final void interruptThread() {
169 Thread currentThread = thread;
170 if (currentThread == null) {
171 interrupted = true;
172 } else {
173 currentThread.interrupt();
174 }
175 }
176
177
178
179
180
181
182 protected final Runnable pollTask() {
183 assert inEventLoop();
184
185 for (;;) {
186 Runnable task = taskQueue.poll();
187 if (task == WAKEUP_TASK) {
188 continue;
189 }
190 return task;
191 }
192 }
193
194
195
196
197
198
199
200
201
202
203
204
205 protected final Runnable takeTask() {
206 assert inEventLoop();
207 if (!(taskQueue instanceof BlockingQueue)) {
208 throw new UnsupportedOperationException();
209 }
210
211 BlockingQueue<Runnable> taskQueue = (BlockingQueue<Runnable>) this.taskQueue;
212 for (;;) {
213 RunnableScheduledFuture<?> scheduledTask = peekScheduledTask();
214 if (scheduledTask == null) {
215 Runnable task = null;
216 try {
217 task = taskQueue.take();
218 if (task == WAKEUP_TASK) {
219 task = null;
220 }
221 } catch (InterruptedException e) {
222
223 }
224 return task;
225 } else {
226 long delayNanos = scheduledTask.delayNanos();
227 Runnable task = null;
228 if (delayNanos > 0) {
229 try {
230 task = taskQueue.poll(delayNanos, TimeUnit.NANOSECONDS);
231 } catch (InterruptedException e) {
232
233 return null;
234 }
235 }
236 if (task == null) {
237
238
239
240
241 fetchFromScheduledTaskQueue();
242 task = taskQueue.poll();
243 }
244
245 if (task != null) {
246 return task;
247 }
248 }
249 }
250 }
251
252 private boolean fetchFromScheduledTaskQueue() {
253 long nanoTime = getCurrentTimeNanos();
254 RunnableScheduledFuture<?> scheduledTask = pollScheduledTask(nanoTime);
255 while (scheduledTask != null) {
256 if (!taskQueue.offer(scheduledTask)) {
257
258 schedule(scheduledTask);
259 return false;
260 }
261 scheduledTask = pollScheduledTask(nanoTime);
262 }
263 return true;
264 }
265
266
267
268
269 protected final boolean hasTasks() {
270 return !taskQueue.isEmpty();
271 }
272
273
274
275
276 public final int pendingTasks() {
277 return taskQueue.size();
278 }
279
280
281
282
283
284 private void addTask(Runnable task) {
285 if (!offerTask(task)) {
286 rejectedExecutionHandler.rejected(task, this);
287 }
288 }
289
290
291
292
293 protected final boolean offerTask(Runnable task) {
294 requireNonNull(task, "task");
295 if (isShutdown()) {
296 reject();
297 }
298 return taskQueue.offer(task);
299 }
300
301
302
303
304 protected final boolean removeTask(Runnable task) {
305 return taskQueue.remove(task);
306 }
307
308
309
310
311
312
313
314
315 private boolean runAllTasks() {
316 boolean fetchedAll;
317 do {
318 fetchedAll = fetchFromScheduledTaskQueue();
319 Runnable task = pollTask();
320 if (task == null) {
321 return false;
322 }
323
324 do {
325 try {
326 runTask(task);
327 } catch (Throwable t) {
328 logger.warn("A task raised an exception.", t);
329 }
330 } while ((task = pollTask()) != null);
331 } while (!fetchedAll);
332
333 updateLastExecutionTime();
334 return true;
335 }
336
337 private void runTask(@Execute Runnable task) {
338 task.run();
339 }
340
341
342
343
344
345
346
347
348 protected int runAllTasks(int maxTasks) {
349 assert inEventLoop();
350 boolean fetchedAll;
351 int processedTasks = 0;
352 do {
353 fetchedAll = fetchFromScheduledTaskQueue();
354 for (; processedTasks < maxTasks; processedTasks++) {
355 Runnable task = pollTask();
356 if (task == null) {
357 break;
358 }
359
360 try {
361 runTask(task);
362 } catch (Throwable t) {
363 logger.warn("A task raised an exception.", t);
364 }
365 }
366 } while (!fetchedAll && processedTasks < maxTasks);
367
368 if (processedTasks > 0) {
369
370 updateLastExecutionTime();
371 }
372 return processedTasks;
373 }
374
375
376
377
378
379
380 protected final long delayNanos(long currentTimeNanos) {
381 assert inEventLoop();
382 currentTimeNanos -= START_TIME;
383 RunnableScheduledFuture<?> scheduledTask = peekScheduledTask();
384 if (scheduledTask == null) {
385 return SCHEDULE_PURGE_INTERVAL;
386 }
387
388 return scheduledTask.delayNanos(currentTimeNanos);
389 }
390
391
392
393
394
395
396
397 protected final long deadlineNanos() {
398 assert inEventLoop();
399 RunnableScheduledFuture<?> scheduledTask = peekScheduledTask();
400 return scheduledTask == null ? -1 : scheduledTask.deadlineNanos();
401 }
402
403
404
405
406
407
408
409
410
411 protected final void updateLastExecutionTime() {
412 assert inEventLoop();
413 lastExecutionTime = getCurrentTimeNanos();
414 }
415
416
417
418
419
420
421
422
423
424 protected void run() {
425 assert inEventLoop();
426 do {
427 Runnable task = takeTask();
428 if (task != null) {
429 runTask(task);
430 updateLastExecutionTime();
431 }
432 } while (!confirmShutdown());
433 }
434
435
436
437
438 protected void cleanup() {
439
440 assert inEventLoop();
441 }
442
443 protected void wakeup(boolean inEventLoop) {
444 if (!inEventLoop) {
445
446
447 taskQueue.offer(WAKEUP_TASK);
448 }
449 }
450
451 @Override
452 public final boolean inEventLoop(Thread thread) {
453 return thread == this.thread;
454 }
455
456
457
458
459 public final void addShutdownHook(final Runnable task) {
460 if (inEventLoop()) {
461 shutdownHooks.add(task);
462 } else {
463 execute(() -> shutdownHooks.add(task));
464 }
465 }
466
467
468
469
470 public final void removeShutdownHook(final Runnable task) {
471 if (inEventLoop()) {
472 shutdownHooks.remove(task);
473 } else {
474 execute(() -> shutdownHooks.remove(task));
475 }
476 }
477
478 private boolean runShutdownHooks() {
479 boolean ran = false;
480
481 while (!shutdownHooks.isEmpty()) {
482 List<Runnable> copy = new ArrayList<>(shutdownHooks);
483 shutdownHooks.clear();
484 for (Runnable task: copy) {
485 try {
486 runTask(task);
487 } catch (Throwable t) {
488 logger.warn("Shutdown hook raised an exception.", t);
489 } finally {
490 ran = true;
491 }
492 }
493 }
494
495 if (ran) {
496 updateLastExecutionTime();
497 }
498
499 return ran;
500 }
501
502 @Override
503 public final Future<Void> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
504 if (quietPeriod < 0) {
505 throw new IllegalArgumentException("quietPeriod: " + quietPeriod + " (expected >= 0)");
506 }
507 if (timeout < quietPeriod) {
508 throw new IllegalArgumentException(
509 "timeout: " + timeout + " (expected >= quietPeriod (" + quietPeriod + "))");
510 }
511 requireNonNull(unit, "unit");
512
513 if (isShuttingDown()) {
514 return terminationFuture();
515 }
516
517 boolean inEventLoop = inEventLoop();
518 boolean wakeup;
519 int oldState;
520 for (;;) {
521 if (isShuttingDown()) {
522 return terminationFuture();
523 }
524 int newState;
525 wakeup = true;
526 oldState = state;
527 if (inEventLoop) {
528 newState = ST_SHUTTING_DOWN;
529 } else {
530 switch (oldState) {
531 case ST_NOT_STARTED:
532 case ST_STARTED:
533 newState = ST_SHUTTING_DOWN;
534 break;
535 default:
536 newState = oldState;
537 wakeup = false;
538 }
539 }
540 if (STATE_UPDATER.compareAndSet(this, oldState, newState)) {
541 break;
542 }
543 }
544 gracefulShutdownQuietPeriod = unit.toNanos(quietPeriod);
545 gracefulShutdownTimeout = unit.toNanos(timeout);
546
547 if (ensureThreadStarted(oldState)) {
548 return terminationFuture.asFuture();
549 }
550
551 if (wakeup) {
552 taskQueue.offer(WAKEUP_TASK);
553 if (!addTaskWakesUp) {
554 wakeup(inEventLoop);
555 }
556 }
557
558 return terminationFuture();
559 }
560
561 @Override
562 public final Future<Void> terminationFuture() {
563 return terminationFuture.asFuture();
564 }
565
566 @Override
567 public final boolean isShuttingDown() {
568 return state >= ST_SHUTTING_DOWN;
569 }
570
571 @Override
572 public final boolean isShutdown() {
573 return state >= ST_SHUTDOWN;
574 }
575
576 @Override
577 public final boolean isTerminated() {
578 return state == ST_TERMINATED;
579 }
580
581
582
583
584
585
586 protected final boolean confirmShutdown() {
587 return confirmShutdown0();
588 }
589
590 boolean confirmShutdown0() {
591 assert inEventLoop();
592
593 if (!isShuttingDown()) {
594 return false;
595 }
596
597 cancelScheduledTasks();
598
599 if (gracefulShutdownStartTime == 0) {
600 gracefulShutdownStartTime = getCurrentTimeNanos();
601 }
602
603 if (runAllTasks() || runShutdownHooks()) {
604 if (isShutdown()) {
605
606 return true;
607 }
608
609
610
611
612 if (gracefulShutdownQuietPeriod == 0) {
613 return true;
614 }
615 taskQueue.offer(WAKEUP_TASK);
616 return false;
617 }
618
619 final long nanoTime = getCurrentTimeNanos();
620
621 if (isShutdown() || nanoTime - gracefulShutdownStartTime > gracefulShutdownTimeout) {
622 return true;
623 }
624
625 if (nanoTime - lastExecutionTime <= gracefulShutdownQuietPeriod) {
626
627
628 taskQueue.offer(WAKEUP_TASK);
629 try {
630 Thread.sleep(100);
631 } catch (InterruptedException e) {
632
633 }
634
635 return false;
636 }
637
638
639
640 return true;
641 }
642
643 @Override
644 public final boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
645 requireNonNull(unit, "unit");
646
647 if (inEventLoop()) {
648 throw new IllegalStateException("cannot await termination of the current thread");
649 }
650
651 threadLock.await(timeout, unit);
652
653 return isTerminated();
654 }
655
656 @Override
657 public void execute(@Schedule Runnable task) {
658 requireNonNull(task, "task");
659
660 boolean inEventLoop = inEventLoop();
661 addTask(task);
662 if (!inEventLoop) {
663 startThread();
664 if (isShutdown()) {
665 boolean reject = false;
666 try {
667 if (removeTask(task)) {
668 reject = true;
669 }
670 } catch (UnsupportedOperationException e) {
671
672
673
674 }
675 if (reject) {
676 reject();
677 }
678 }
679 }
680
681 if (!addTaskWakesUp && wakesUpForTask(task)) {
682 wakeup(inEventLoop);
683 }
684 }
685
686
687
688
689
690
691
692
693 public final ThreadProperties threadProperties() throws InterruptedException {
694 ThreadProperties threadProperties = this.threadProperties;
695 if (threadProperties == null) {
696 Thread thread = this.thread;
697 if (thread == null) {
698 assert !inEventLoop();
699 submit(NOOP_TASK).asStage().sync();
700 thread = this.thread;
701 assert thread != null;
702 }
703
704 threadProperties = new DefaultThreadProperties(thread);
705 if (!PROPERTIES_UPDATER.compareAndSet(this, null, threadProperties)) {
706 threadProperties = this.threadProperties;
707 }
708 }
709
710 return threadProperties;
711 }
712
713
714
715
716
717 protected boolean wakesUpForTask(@SuppressWarnings("unused") Runnable task) {
718 return true;
719 }
720
721 protected static void reject() {
722 throw new RejectedExecutionException("event executor terminated");
723 }
724
725
726
727 private static final long SCHEDULE_PURGE_INTERVAL = TimeUnit.SECONDS.toNanos(1);
728
729 private void startThread() {
730 if (state == ST_NOT_STARTED) {
731 if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
732 boolean success = false;
733 try {
734 doStartThread();
735 success = true;
736 } finally {
737 if (!success) {
738 STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
739 }
740 }
741 }
742 }
743 }
744
745 private boolean ensureThreadStarted(int oldState) {
746 if (oldState == ST_NOT_STARTED) {
747 try {
748 doStartThread();
749 } catch (Throwable cause) {
750 STATE_UPDATER.set(this, ST_TERMINATED);
751 terminationFuture.tryFailure(cause);
752
753 if (cause instanceof Error) {
754
755 throw cause;
756 }
757 return true;
758 }
759 }
760 return false;
761 }
762
763 private void doStartThread() {
764 assert thread == null;
765 executor.execute(() -> {
766 thread = Thread.currentThread();
767 if (interrupted) {
768 thread.interrupt();
769 }
770
771 boolean success = false;
772 updateLastExecutionTime();
773 try {
774 run();
775 success = true;
776 } catch (Throwable t) {
777 logger.warn("Unexpected exception from an event executor: ", t);
778 } finally {
779 for (;;) {
780 int oldState = state;
781 if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
782 this, oldState, ST_SHUTTING_DOWN)) {
783 break;
784 }
785 }
786
787
788 if (success && gracefulShutdownStartTime == 0) {
789 if (logger.isErrorEnabled()) {
790 logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
791 SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must " +
792 "be called before run() implementation terminates.");
793 }
794 }
795
796 try {
797
798
799
800 for (;;) {
801 if (confirmShutdown()) {
802 break;
803 }
804 }
805
806
807
808 for (;;) {
809 int oldState = state;
810 if (oldState >= ST_SHUTDOWN || STATE_UPDATER.compareAndSet(
811 this, oldState, ST_SHUTDOWN)) {
812 break;
813 }
814 }
815
816
817
818 confirmShutdown();
819 } finally {
820 try {
821 cleanup();
822 } finally {
823
824
825
826
827 FastThreadLocal.removeAll();
828
829 STATE_UPDATER.set(this, ST_TERMINATED);
830 threadLock.countDown();
831 int numUserTasks = drainTasks();
832 if (numUserTasks > 0 && logger.isWarnEnabled()) {
833 logger.warn("An event executor terminated with " +
834 "non-empty task queue (" + numUserTasks + ')');
835 }
836 terminationFuture.setSuccess(null);
837 }
838 }
839 }
840 });
841 }
842
843 final int drainTasks() {
844 int numTasks = 0;
845 for (;;) {
846 Runnable runnable = taskQueue.poll();
847 if (runnable == null) {
848 break;
849 }
850
851
852 if (WAKEUP_TASK != runnable) {
853 numTasks++;
854 }
855 }
856 return numTasks;
857 }
858
859 private static final class DefaultThreadProperties implements ThreadProperties {
860 private final Thread t;
861
862 DefaultThreadProperties(Thread t) {
863 this.t = t;
864 }
865
866 @Override
867 public State state() {
868 return t.getState();
869 }
870
871 @Override
872 public int priority() {
873 return t.getPriority();
874 }
875
876 @Override
877 public boolean isInterrupted() {
878 return t.isInterrupted();
879 }
880
881 @Override
882 public boolean isDaemon() {
883 return t.isDaemon();
884 }
885
886 @Override
887 public String name() {
888 return t.getName();
889 }
890
891 @Override
892 public long id() {
893 return t.getId();
894 }
895
896 @Override
897 public StackTraceElement[] stackTrace() {
898 return t.getStackTrace();
899 }
900
901 @Override
902 public boolean isAlive() {
903 return t.isAlive();
904 }
905 }
906 }