1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty5.util.concurrent;
17
18 import io.netty5.util.internal.logging.InternalLogger;
19 import io.netty5.util.internal.logging.InternalLoggerFactory;
20
21 import java.util.ArrayDeque;
22 import java.util.Queue;
23 import java.util.concurrent.Callable;
24 import java.util.concurrent.TimeUnit;
25
26 import static java.util.Objects.requireNonNull;
27
28
29
30
31
32
33
34
35 public final class ImmediateEventExecutor extends AbstractEventExecutor {
36 private static final InternalLogger logger = InternalLoggerFactory.getInstance(ImmediateEventExecutor.class);
37 public static final ImmediateEventExecutor INSTANCE = new ImmediateEventExecutor();
38
39
40
41 private static final FastThreadLocal<Queue<Runnable>> DELAYED_RUNNABLES = new FastThreadLocal<>() {
42 @Override
43 protected Queue<Runnable> initialValue() throws Exception {
44 return new ArrayDeque<>();
45 }
46 };
47
48
49
50 private static final FastThreadLocal<Boolean> RUNNING = new FastThreadLocal<>() {
51 @Override
52 protected Boolean initialValue() throws Exception {
53 return false;
54 }
55 };
56
57 private final Future<Void> terminationFuture = DefaultPromise.<Void>newFailedPromise(
58 GlobalEventExecutor.INSTANCE, new UnsupportedOperationException()).asFuture();
59
60 private ImmediateEventExecutor() { }
61
62 @Override
63 public boolean inEventLoop(Thread thread) {
64 return true;
65 }
66
67 @Override
68 public Future<Void> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
69 return terminationFuture();
70 }
71
72 @Override
73 public Future<Void> terminationFuture() {
74 return terminationFuture;
75 }
76
77 @Override
78 public boolean isShuttingDown() {
79 return false;
80 }
81
82 @Override
83 public boolean isShutdown() {
84 return false;
85 }
86
87 @Override
88 public boolean isTerminated() {
89 return false;
90 }
91
92 @Override
93 public boolean awaitTermination(long timeout, TimeUnit unit) {
94 return false;
95 }
96
97 @Override
98 public void execute(Runnable task) {
99 requireNonNull(task, "command");
100 if (!RUNNING.get()) {
101 RUNNING.set(true);
102 try {
103 task.run();
104 } catch (Throwable cause) {
105 logger.info("Throwable caught while executing Runnable {}", task, cause);
106 } finally {
107 Queue<Runnable> delayedRunnables = DELAYED_RUNNABLES.get();
108 Runnable runnable;
109 while ((runnable = delayedRunnables.poll()) != null) {
110 try {
111 runnable.run();
112 } catch (Throwable cause) {
113 logger.info("Throwable caught while executing Runnable {}", runnable, cause);
114 }
115 }
116 RUNNING.set(false);
117 }
118 } else {
119 DELAYED_RUNNABLES.get().add(task);
120 }
121 }
122
123 @Override
124 public <V> Promise<V> newPromise() {
125 return new ImmediatePromise<>(this);
126 }
127
128 @Override
129 public Future<Void> schedule(Runnable task, long delay,
130 TimeUnit unit) {
131 throw new UnsupportedOperationException();
132 }
133
134 @Override
135 public <V> Future<V> schedule(Callable<V> task, long delay, TimeUnit unit) {
136 throw new UnsupportedOperationException();
137 }
138
139 @Override
140 public Future<Void> scheduleAtFixedRate(Runnable task, long initialDelay, long period, TimeUnit unit) {
141 throw new UnsupportedOperationException();
142 }
143
144 @Override
145 public Future<Void> scheduleWithFixedDelay(Runnable task, long initialDelay, long delay, TimeUnit unit) {
146 throw new UnsupportedOperationException();
147 }
148
149 static class ImmediatePromise<V> extends DefaultPromise<V> {
150 ImmediatePromise(EventExecutor executor) {
151 super(executor);
152 }
153
154 @Override
155 protected void checkDeadLock() {
156
157 }
158 }
159 }