1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.util.concurrent;
17
18 import io.netty.util.internal.ObjectUtil;
19 import io.netty.util.internal.SystemPropertyUtil;
20 import io.netty.util.internal.logging.InternalLogger;
21 import io.netty.util.internal.logging.InternalLoggerFactory;
22
23 import java.lang.Thread.State;
24 import java.util.ArrayList;
25 import java.util.Collection;
26 import java.util.LinkedHashSet;
27 import java.util.List;
28 import java.util.Queue;
29 import java.util.Set;
30 import java.util.concurrent.BlockingQueue;
31 import java.util.concurrent.Callable;
32 import java.util.concurrent.ExecutionException;
33 import java.util.concurrent.LinkedBlockingQueue;
34 import java.util.concurrent.RejectedExecutionException;
35 import java.util.concurrent.Semaphore;
36 import java.util.concurrent.ThreadFactory;
37 import java.util.concurrent.TimeUnit;
38 import java.util.concurrent.TimeoutException;
39 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
40
41
42
43
44
45 public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
46
47 static final int DEFAULT_MAX_PENDING_EXECUTOR_TASKS = Math.max(16,
48 SystemPropertyUtil.getInt("io.netty.eventexecutor.maxPendingTasks", Integer.MAX_VALUE));
49
50 private static final InternalLogger logger =
51 InternalLoggerFactory.getInstance(SingleThreadEventExecutor.class);
52
53 private static final int ST_NOT_STARTED = 1;
54 private static final int ST_STARTED = 2;
55 private static final int ST_SHUTTING_DOWN = 3;
56 private static final int ST_SHUTDOWN = 4;
57 private static final int ST_TERMINATED = 5;
58
59 private static final Runnable WAKEUP_TASK = new Runnable() {
60 @Override
61 public void run() {
62
63 }
64 };
65
66 private static final AtomicIntegerFieldUpdater<SingleThreadEventExecutor> STATE_UPDATER =
67 AtomicIntegerFieldUpdater.newUpdater(SingleThreadEventExecutor.class, "state");
68
69 private final EventExecutorGroup parent;
70 private final Queue<Runnable> taskQueue;
71 private final Thread thread;
72 private final ThreadProperties threadProperties;
73 private final Semaphore threadLock = new Semaphore(0);
74 private final Set<Runnable> shutdownHooks = new LinkedHashSet<Runnable>();
75 private final boolean addTaskWakesUp;
76 private final int maxPendingTasks;
77 private final RejectedExecutionHandler rejectedExecutionHandler;
78
79 private long lastExecutionTime;
80
81 @SuppressWarnings({ "FieldMayBeFinal", "unused" })
82 private volatile int state = ST_NOT_STARTED;
83
84 private volatile long gracefulShutdownQuietPeriod;
85 private volatile long gracefulShutdownTimeout;
86 private long gracefulShutdownStartTime;
87
88 private final Promise<?> terminationFuture = new DefaultPromise<Void>(GlobalEventExecutor.INSTANCE);
89
90
91
92
93
94
95
96
97
98 protected SingleThreadEventExecutor(
99 EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
100 this(parent, threadFactory, addTaskWakesUp, DEFAULT_MAX_PENDING_EXECUTOR_TASKS,
101 RejectedExecutionHandlers.reject());
102 }
103
104
105
106
107
108
109
110
111
112
113
114 @SuppressWarnings("deprecation")
115 protected SingleThreadEventExecutor(
116 EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp, int maxPendingTasks,
117 RejectedExecutionHandler rejectedHandler) {
118 if (threadFactory == null) {
119 throw new NullPointerException("threadFactory");
120 }
121
122 this.parent = parent;
123 this.addTaskWakesUp = addTaskWakesUp;
124
125 thread = threadFactory.newThread(new Runnable() {
126 @Override
127 public void run() {
128 boolean success = false;
129 updateLastExecutionTime();
130 try {
131 SingleThreadEventExecutor.this.run();
132 success = true;
133 } catch (Throwable t) {
134 logger.warn("Unexpected exception from an event executor: ", t);
135 } finally {
136 for (;;) {
137 int oldState = STATE_UPDATER.get(SingleThreadEventExecutor.this);
138 if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
139 SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
140 break;
141 }
142 }
143
144 if (success && gracefulShutdownStartTime == 0) {
145 logger.error(
146 "Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
147 SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +
148 "before run() implementation terminates.");
149 }
150
151 try {
152
153 for (;;) {
154 if (confirmShutdown()) {
155 break;
156 }
157 }
158 } finally {
159 try {
160 cleanup();
161 } finally {
162 STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
163 threadLock.release();
164 if (!taskQueue.isEmpty()) {
165 logger.warn(
166 "An event executor terminated with " +
167 "non-empty task queue (" + taskQueue.size() + ')');
168 }
169
170 terminationFuture.setSuccess(null);
171 }
172 }
173 }
174 }
175 });
176 threadProperties = new DefaultThreadProperties(thread);
177 this.maxPendingTasks = Math.max(16, maxPendingTasks);
178 taskQueue = newTaskQueue();
179 rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
180 }
181
182
183
184
185 @Deprecated
186 protected Queue<Runnable> newTaskQueue() {
187 return newTaskQueue(maxPendingTasks);
188 }
189
190
191
192
193
194
195
196 protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
197 return new LinkedBlockingQueue<Runnable>(maxPendingTasks);
198 }
199
200 @Override
201 public EventExecutorGroup parent() {
202 return parent;
203 }
204
205
206
207
208 protected void interruptThread() {
209 thread.interrupt();
210 }
211
212
213
214
215 protected Runnable pollTask() {
216 assert inEventLoop();
217 for (;;) {
218 Runnable task = taskQueue.poll();
219 if (task == WAKEUP_TASK) {
220 continue;
221 }
222 return task;
223 }
224 }
225
226
227
228
229
230
231
232
233
234
235 protected Runnable takeTask() {
236 assert inEventLoop();
237 if (!(taskQueue instanceof BlockingQueue)) {
238 throw new UnsupportedOperationException();
239 }
240
241 BlockingQueue<Runnable> taskQueue = (BlockingQueue<Runnable>) this.taskQueue;
242 for (;;) {
243 ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
244 if (scheduledTask == null) {
245 Runnable task = null;
246 try {
247 task = taskQueue.take();
248 if (task == WAKEUP_TASK) {
249 task = null;
250 }
251 } catch (InterruptedException e) {
252
253 }
254 return task;
255 } else {
256 long delayNanos = scheduledTask.delayNanos();
257 Runnable task = null;
258 if (delayNanos > 0) {
259 try {
260 task = taskQueue.poll(delayNanos, TimeUnit.NANOSECONDS);
261 } catch (InterruptedException e) {
262 return null;
263 }
264 }
265 if (task == null) {
266
267
268
269
270 fetchFromScheduledTaskQueue();
271 task = taskQueue.poll();
272 }
273
274 if (task != null) {
275 return task;
276 }
277 }
278 }
279 }
280
281 private boolean fetchFromScheduledTaskQueue() {
282 long nanoTime = AbstractScheduledEventExecutor.nanoTime();
283 Runnable scheduledTask = pollScheduledTask(nanoTime);
284 while (scheduledTask != null) {
285 if (!taskQueue.offer(scheduledTask)) {
286
287 scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);
288 return false;
289 }
290 scheduledTask = pollScheduledTask(nanoTime);
291 }
292 return true;
293 }
294
295
296
297
298 protected Runnable peekTask() {
299 assert inEventLoop();
300 return taskQueue.peek();
301 }
302
303
304
305
306 protected boolean hasTasks() {
307 assert inEventLoop();
308 return !taskQueue.isEmpty();
309 }
310
311
312
313
314
315
316
317 public int pendingTasks() {
318 return taskQueue.size();
319 }
320
321
322
323
324
325 protected void addTask(Runnable task) {
326 if (task == null) {
327 throw new NullPointerException("task");
328 }
329 if (!offerTask(task)) {
330 rejectedExecutionHandler.rejected(task, this);
331 }
332 }
333
334 final boolean offerTask(Runnable task) {
335 if (isShutdown()) {
336 reject();
337 }
338 return taskQueue.offer(task);
339 }
340
341
342
343
344 protected boolean removeTask(Runnable task) {
345 if (task == null) {
346 throw new NullPointerException("task");
347 }
348 return taskQueue.remove(task);
349 }
350
351
352
353
354
355
356 protected boolean runAllTasks() {
357 boolean fetchedAll;
358 do {
359 fetchedAll = fetchFromScheduledTaskQueue();
360 Runnable task = pollTask();
361 if (task == null) {
362 return false;
363 }
364
365 for (;;) {
366 try {
367 task.run();
368 } catch (Throwable t) {
369 logger.warn("A task raised an exception.", t);
370 }
371
372 task = pollTask();
373 if (task == null) {
374 break;
375 }
376 }
377 } while (!fetchedAll);
378
379 lastExecutionTime = ScheduledFutureTask.nanoTime();
380 return true;
381 }
382
383
384
385
386
387 protected boolean runAllTasks(long timeoutNanos) {
388 fetchFromScheduledTaskQueue();
389 Runnable task = pollTask();
390 if (task == null) {
391 return false;
392 }
393
394 final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
395 long runTasks = 0;
396 long lastExecutionTime;
397 for (;;) {
398 try {
399 task.run();
400 } catch (Throwable t) {
401 logger.warn("A task raised an exception.", t);
402 }
403
404 runTasks ++;
405
406
407
408 if ((runTasks & 0x3F) == 0) {
409 lastExecutionTime = ScheduledFutureTask.nanoTime();
410 if (lastExecutionTime >= deadline) {
411 break;
412 }
413 }
414
415 task = pollTask();
416 if (task == null) {
417 lastExecutionTime = ScheduledFutureTask.nanoTime();
418 break;
419 }
420 }
421
422 this.lastExecutionTime = lastExecutionTime;
423 return true;
424 }
425
426
427
428
429 protected long delayNanos(long currentTimeNanos) {
430 ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
431 if (scheduledTask == null) {
432 return SCHEDULE_PURGE_INTERVAL;
433 }
434
435 return scheduledTask.delayNanos(currentTimeNanos);
436 }
437
438
439
440
441
442
443
444
445 protected void updateLastExecutionTime() {
446 lastExecutionTime = ScheduledFutureTask.nanoTime();
447 }
448
449
450
451
452 protected abstract void run();
453
454
455
456
457 protected void cleanup() {
458
459 }
460
461 protected void wakeup(boolean inEventLoop) {
462 if (!inEventLoop || state == ST_SHUTTING_DOWN) {
463
464
465 taskQueue.offer(WAKEUP_TASK);
466 }
467 }
468
469 @Override
470 public boolean inEventLoop(Thread thread) {
471 return thread == this.thread;
472 }
473
474
475
476
477 public void addShutdownHook(final Runnable task) {
478 if (inEventLoop()) {
479 shutdownHooks.add(task);
480 } else {
481 execute(new Runnable() {
482 @Override
483 public void run() {
484 shutdownHooks.add(task);
485 }
486 });
487 }
488 }
489
490
491
492
493 public void removeShutdownHook(final Runnable task) {
494 if (inEventLoop()) {
495 shutdownHooks.remove(task);
496 } else {
497 execute(new Runnable() {
498 @Override
499 public void run() {
500 shutdownHooks.remove(task);
501 }
502 });
503 }
504 }
505
506 private boolean runShutdownHooks() {
507 boolean ran = false;
508
509 while (!shutdownHooks.isEmpty()) {
510 List<Runnable> copy = new ArrayList<Runnable>(shutdownHooks);
511 shutdownHooks.clear();
512 for (Runnable task: copy) {
513 try {
514 task.run();
515 } catch (Throwable t) {
516 logger.warn("Shutdown hook raised an exception.", t);
517 } finally {
518 ran = true;
519 }
520 }
521 }
522
523 if (ran) {
524 lastExecutionTime = ScheduledFutureTask.nanoTime();
525 }
526
527 return ran;
528 }
529
530 @Override
531 public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
532 if (quietPeriod < 0) {
533 throw new IllegalArgumentException("quietPeriod: " + quietPeriod + " (expected >= 0)");
534 }
535 if (timeout < quietPeriod) {
536 throw new IllegalArgumentException(
537 "timeout: " + timeout + " (expected >= quietPeriod (" + quietPeriod + "))");
538 }
539 if (unit == null) {
540 throw new NullPointerException("unit");
541 }
542
543 if (isShuttingDown()) {
544 return terminationFuture();
545 }
546
547 boolean inEventLoop = inEventLoop();
548 boolean wakeup;
549 int oldState;
550 for (;;) {
551 if (isShuttingDown()) {
552 return terminationFuture();
553 }
554 int newState;
555 wakeup = true;
556 oldState = state;
557 if (inEventLoop) {
558 newState = ST_SHUTTING_DOWN;
559 } else {
560 switch (oldState) {
561 case ST_NOT_STARTED:
562 case ST_STARTED:
563 newState = ST_SHUTTING_DOWN;
564 break;
565 default:
566 newState = oldState;
567 wakeup = false;
568 }
569 }
570 if (STATE_UPDATER.compareAndSet(this, oldState, newState)) {
571 break;
572 }
573 }
574 gracefulShutdownQuietPeriod = unit.toNanos(quietPeriod);
575 gracefulShutdownTimeout = unit.toNanos(timeout);
576
577 if (oldState == ST_NOT_STARTED) {
578 thread.start();
579 }
580
581 if (wakeup) {
582 wakeup(inEventLoop);
583 }
584
585 return terminationFuture();
586 }
587
588 @Override
589 public Future<?> terminationFuture() {
590 return terminationFuture;
591 }
592
593 @Override
594 @Deprecated
595 public void shutdown() {
596 if (isShutdown()) {
597 return;
598 }
599
600 boolean inEventLoop = inEventLoop();
601 boolean wakeup;
602 int oldState;
603 for (;;) {
604 if (isShuttingDown()) {
605 return;
606 }
607 int newState;
608 wakeup = true;
609 oldState = state;
610 if (inEventLoop) {
611 newState = ST_SHUTDOWN;
612 } else {
613 switch (oldState) {
614 case ST_NOT_STARTED:
615 case ST_STARTED:
616 case ST_SHUTTING_DOWN:
617 newState = ST_SHUTDOWN;
618 break;
619 default:
620 newState = oldState;
621 wakeup = false;
622 }
623 }
624 if (STATE_UPDATER.compareAndSet(this, oldState, newState)) {
625 break;
626 }
627 }
628
629 if (oldState == ST_NOT_STARTED) {
630 thread.start();
631 }
632
633 if (wakeup) {
634 wakeup(inEventLoop);
635 }
636 }
637
638 @Override
639 public boolean isShuttingDown() {
640 return state >= ST_SHUTTING_DOWN;
641 }
642
643 @Override
644 public boolean isShutdown() {
645 return state >= ST_SHUTDOWN;
646 }
647
648 @Override
649 public boolean isTerminated() {
650 return state == ST_TERMINATED;
651 }
652
653
654
655
656 protected boolean confirmShutdown() {
657 if (!isShuttingDown()) {
658 return false;
659 }
660
661 if (!inEventLoop()) {
662 throw new IllegalStateException("must be invoked from an event loop");
663 }
664
665 cancelScheduledTasks();
666
667 if (gracefulShutdownStartTime == 0) {
668 gracefulShutdownStartTime = ScheduledFutureTask.nanoTime();
669 }
670
671 if (runAllTasks() || runShutdownHooks()) {
672 if (isShutdown()) {
673
674 return true;
675 }
676
677
678
679
680 if (gracefulShutdownQuietPeriod == 0) {
681 return true;
682 }
683 wakeup(true);
684 return false;
685 }
686
687 final long nanoTime = ScheduledFutureTask.nanoTime();
688
689 if (isShutdown() || nanoTime - gracefulShutdownStartTime > gracefulShutdownTimeout) {
690 return true;
691 }
692
693 if (nanoTime - lastExecutionTime <= gracefulShutdownQuietPeriod) {
694
695
696 wakeup(true);
697 try {
698 Thread.sleep(100);
699 } catch (InterruptedException e) {
700
701 }
702
703 return false;
704 }
705
706
707
708 return true;
709 }
710
711 @Override
712 public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
713 if (unit == null) {
714 throw new NullPointerException("unit");
715 }
716
717 if (inEventLoop()) {
718 throw new IllegalStateException("cannot await termination of the current thread");
719 }
720
721 if (threadLock.tryAcquire(timeout, unit)) {
722 threadLock.release();
723 }
724
725 return isTerminated();
726 }
727
728 @Override
729 public void execute(Runnable task) {
730 if (task == null) {
731 throw new NullPointerException("task");
732 }
733
734 boolean inEventLoop = inEventLoop();
735 if (inEventLoop) {
736 addTask(task);
737 } else {
738 startThread();
739 addTask(task);
740 if (isShutdown() && removeTask(task)) {
741 reject();
742 }
743 }
744
745 if (!addTaskWakesUp && wakesUpForTask(task)) {
746 wakeup(inEventLoop);
747 }
748 }
749
750 @Override
751 public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
752 throwIfInEventLoop("invokeAny");
753 return super.invokeAny(tasks);
754 }
755
756 @Override
757 public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
758 throws InterruptedException, ExecutionException, TimeoutException {
759 throwIfInEventLoop("invokeAny");
760 return super.invokeAny(tasks, timeout, unit);
761 }
762
763 @Override
764 public <T> List<java.util.concurrent.Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
765 throws InterruptedException {
766 throwIfInEventLoop("invokeAll");
767 return super.invokeAll(tasks);
768 }
769
770 @Override
771 public <T> List<java.util.concurrent.Future<T>> invokeAll(
772 Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
773 throwIfInEventLoop("invokeAll");
774 return super.invokeAll(tasks, timeout, unit);
775 }
776
777 private void throwIfInEventLoop(String method) {
778 if (inEventLoop()) {
779 throw new RejectedExecutionException("Calling " + method + " from within the EventLoop is not allowed");
780 }
781 }
782
783
784
785
786 public final ThreadProperties threadProperties() {
787 return threadProperties;
788 }
789
790 @SuppressWarnings("unused")
791 protected boolean wakesUpForTask(Runnable task) {
792 return true;
793 }
794
795 protected static void reject() {
796 throw new RejectedExecutionException("event executor terminated");
797 }
798
799
800
801 private static final long SCHEDULE_PURGE_INTERVAL = TimeUnit.SECONDS.toNanos(1);
802
803 private void startThread() {
804 if (state == ST_NOT_STARTED) {
805 if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
806 thread.start();
807 }
808 }
809 }
810
811 private static final class DefaultThreadProperties implements ThreadProperties {
812 private final Thread t;
813
814 DefaultThreadProperties(Thread t) {
815 this.t = t;
816 }
817
818 @Override
819 public State state() {
820 return t.getState();
821 }
822
823 @Override
824 public int priority() {
825 return t.getPriority();
826 }
827
828 @Override
829 public boolean isInterrupted() {
830 return t.isInterrupted();
831 }
832
833 @Override
834 public boolean isDaemon() {
835 return t.isDaemon();
836 }
837
838 @Override
839 public String name() {
840 return t.getName();
841 }
842
843 @Override
844 public long id() {
845 return t.getId();
846 }
847
848 @Override
849 public StackTraceElement[] stackTrace() {
850 return t.getStackTrace();
851 }
852
853 @Override
854 public boolean isAlive() {
855 return t.isAlive();
856 }
857 }
858 }