1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty5.channel.embedded;
17
18 import io.netty5.channel.Channel;
19 import io.netty5.channel.EventLoop;
20 import io.netty5.channel.IoHandle;
21 import io.netty5.util.concurrent.AbstractScheduledEventExecutor;
22 import io.netty5.util.concurrent.Future;
23 import io.netty5.util.concurrent.Promise;
24 import io.netty5.util.internal.StringUtil;
25
26 import java.util.ArrayDeque;
27 import java.util.Queue;
28 import java.util.concurrent.TimeUnit;
29
30 import static java.util.Objects.requireNonNull;
31
32 final class EmbeddedEventLoop extends AbstractScheduledEventExecutor implements EventLoop {
33
34
35
36
37
38
39
40 private long startTime = initialNanoTime();
41
42
43
44 private long frozenTimestamp;
45
46
47
48 private boolean timeFrozen;
49
50 private final Queue<Runnable> tasks = new ArrayDeque<>(2);
51 boolean running;
52
53 private static EmbeddedChannel cast(IoHandle handle) {
54 if (handle instanceof EmbeddedChannel) {
55 return (EmbeddedChannel) handle;
56 }
57 throw new IllegalArgumentException("Channel of type " + StringUtil.simpleClassName(handle) + " not supported");
58 }
59
60 @Override
61 public EventLoop next() {
62 return (EventLoop) super.next();
63 }
64
65 @Override
66 public Future<Void> registerForIo(IoHandle handle) {
67 Promise<Void> promise = newPromise();
68 EmbeddedChannel channel = cast(handle);
69 if (inEventLoop()) {
70 registerForIO0(channel, promise);
71 } else {
72 execute(() -> registerForIO0(channel, promise));
73 }
74 return promise.asFuture();
75 }
76
77 private void registerForIO0(EmbeddedChannel channel, Promise<Void> promise) {
78 assert inEventLoop();
79 try {
80 if (channel.isRegistered()) {
81 throw new IllegalStateException("Channel already registered");
82 }
83 if (!channel.executor().inEventLoop()) {
84 throw new IllegalStateException("Channel.executor() is not using the same Thread as this EventLoop");
85 }
86 channel.setActive();
87 } catch (Throwable cause) {
88 promise.setFailure(cause);
89 return;
90 }
91 promise.setSuccess(null);
92 }
93 @Override
94 public Future<Void> deregisterForIo(IoHandle handle) {
95 Promise<Void> promise = newPromise();
96 EmbeddedChannel channel = cast(handle);
97 if (inEventLoop()) {
98 deregisterForIO0(channel, promise);
99 } else {
100 execute(() -> deregisterForIO0(channel, promise));
101 }
102 return promise.asFuture();
103 }
104
105 private void deregisterForIO0(Channel channel, Promise<Void> promise) {
106 try {
107 if (!channel.isRegistered()) {
108 throw new IllegalStateException("Channel not registered");
109 }
110 if (!channel.executor().inEventLoop()) {
111 throw new IllegalStateException("Channel.executor() is not using the same Thread as this EventLoop");
112 }
113 } catch (Throwable cause) {
114 promise.setFailure(cause);
115 return;
116 }
117 promise.setSuccess(null);
118 }
119
120 @Override
121 public void execute(Runnable task) {
122 requireNonNull(task, "command");
123 tasks.add(task);
124 if (!running) {
125 runTasks();
126 }
127 }
128
129 void runTasks() {
130 boolean wasRunning = running;
131 try {
132 for (;;) {
133 running = true;
134 Runnable task = tasks.poll();
135 if (task == null) {
136 break;
137 }
138
139 task.run();
140 }
141 } finally {
142 if (!wasRunning) {
143 running = false;
144 }
145 }
146 }
147
148 boolean hasPendingNormalTasks() {
149 return !tasks.isEmpty();
150 }
151
152 long runScheduledTasks() {
153 long time = getCurrentTimeNanos();
154 boolean wasRunning = running;
155 try {
156 for (;;) {
157 running = true;
158 Runnable task = pollScheduledTask(time);
159 if (task == null) {
160 return nextScheduledTaskNano();
161 }
162
163 task.run();
164 }
165 } finally {
166 if (!wasRunning) {
167 running = false;
168 }
169 }
170 }
171
172 long nextScheduledTask() {
173 return nextScheduledTaskNano();
174 }
175
176 void cancelScheduled() {
177 running = true;
178 try {
179 cancelScheduledTasks();
180 } finally {
181 running = false;
182 }
183 }
184
185 @Override
186 protected long getCurrentTimeNanos() {
187 if (timeFrozen) {
188 return frozenTimestamp;
189 }
190 return System.nanoTime() - startTime;
191 }
192
193 void advanceTimeBy(long nanos) {
194 if (timeFrozen) {
195 frozenTimestamp += nanos;
196 } else {
197
198 startTime -= nanos;
199 }
200 }
201
202 void freezeTime() {
203 if (!timeFrozen) {
204 frozenTimestamp = getCurrentTimeNanos();
205 timeFrozen = true;
206 }
207 }
208
209 void unfreezeTime() {
210 if (timeFrozen) {
211
212
213
214 startTime = System.nanoTime() - frozenTimestamp;
215 timeFrozen = false;
216 }
217 }
218
219 @Override
220 public Future<Void> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
221 throw new UnsupportedOperationException();
222 }
223
224 @Override
225 public Future<Void> terminationFuture() {
226 throw new UnsupportedOperationException();
227 }
228
229 @Override
230 public boolean isShuttingDown() {
231 return false;
232 }
233
234 @Override
235 public boolean isShutdown() {
236 return false;
237 }
238
239 @Override
240 public boolean isTerminated() {
241 return false;
242 }
243
244 @Override
245 public boolean awaitTermination(long timeout, TimeUnit unit) {
246 return false;
247 }
248
249 @Override
250 public boolean inEventLoop(Thread thread) {
251 return running;
252 }
253
254 @Override
255 public boolean isCompatible(Class<? extends IoHandle> handleType) {
256 return EmbeddedChannel.class.isAssignableFrom(handleType);
257 }
258 }