1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.embedded;
17
18 import io.netty.channel.Channel;
19 import io.netty.channel.ChannelFuture;
20 import io.netty.channel.ChannelPromise;
21 import io.netty.channel.DefaultChannelPromise;
22 import io.netty.channel.EventLoop;
23 import io.netty.channel.EventLoopGroup;
24 import io.netty.util.concurrent.AbstractScheduledEventExecutor;
25 import io.netty.util.concurrent.Future;
26 import io.netty.util.concurrent.MockTicker;
27 import io.netty.util.concurrent.Ticker;
28 import io.netty.util.internal.ObjectUtil;
29
30 import java.util.ArrayDeque;
31 import java.util.Queue;
32 import java.util.concurrent.TimeUnit;
33 import java.util.concurrent.locks.Condition;
34 import java.util.concurrent.locks.Lock;
35 import java.util.concurrent.locks.ReentrantLock;
36
37 final class EmbeddedEventLoop extends AbstractScheduledEventExecutor implements EventLoop {
38 private final FreezableTicker ticker = new FreezableTicker();
39
40 private final Queue<Runnable> tasks = new ArrayDeque<Runnable>(2);
41
42 @Override
43 public EventLoopGroup parent() {
44 return (EventLoopGroup) super.parent();
45 }
46
47 @Override
48 public EventLoop next() {
49 return (EventLoop) super.next();
50 }
51
52 @Override
53 public void execute(Runnable command) {
54 tasks.add(ObjectUtil.checkNotNull(command, "command"));
55 }
56
57 void runTasks() {
58 for (;;) {
59 Runnable task = tasks.poll();
60 if (task == null) {
61 break;
62 }
63
64 task.run();
65 }
66 }
67
68 boolean hasPendingNormalTasks() {
69 return !tasks.isEmpty();
70 }
71
72 long runScheduledTasks() {
73 long time = getCurrentTimeNanos();
74 for (;;) {
75 Runnable task = pollScheduledTask(time);
76 if (task == null) {
77 return nextScheduledTaskNano();
78 }
79
80 task.run();
81 }
82 }
83
84 long nextScheduledTask() {
85 return nextScheduledTaskNano();
86 }
87
88 @Override
89 public Ticker ticker() {
90 return ticker;
91 }
92
93 @Override
94 protected long getCurrentTimeNanos() {
95 return ticker.nanoTime();
96 }
97
98 void advanceTimeBy(long nanos) {
99 ticker.advance(nanos, TimeUnit.NANOSECONDS);
100 }
101
102 void freezeTime() {
103 ticker.freezeTime();
104 }
105
106 void unfreezeTime() {
107 ticker.unfreezeTime();
108 }
109
110 @Override
111 protected void cancelScheduledTasks() {
112 super.cancelScheduledTasks();
113 }
114
115 @Override
116 public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
117 throw new UnsupportedOperationException();
118 }
119
120 @Override
121 public Future<?> terminationFuture() {
122 throw new UnsupportedOperationException();
123 }
124
125 @Override
126 @Deprecated
127 public void shutdown() {
128 throw new UnsupportedOperationException();
129 }
130
131 @Override
132 public boolean isShuttingDown() {
133 return false;
134 }
135
136 @Override
137 public boolean isShutdown() {
138 return false;
139 }
140
141 @Override
142 public boolean isTerminated() {
143 return false;
144 }
145
146 @Override
147 public boolean awaitTermination(long timeout, TimeUnit unit) {
148 return false;
149 }
150
151 @Override
152 public ChannelFuture register(Channel channel) {
153 return register(new DefaultChannelPromise(channel, this));
154 }
155
156 @Override
157 public ChannelFuture register(ChannelPromise promise) {
158 ObjectUtil.checkNotNull(promise, "promise");
159 promise.channel().unsafe().register(this, promise);
160 return promise;
161 }
162
163 @Deprecated
164 @Override
165 public ChannelFuture register(Channel channel, ChannelPromise promise) {
166 channel.unsafe().register(this, promise);
167 return promise;
168 }
169
170 @Override
171 public boolean inEventLoop() {
172 return true;
173 }
174
175 @Override
176 public boolean inEventLoop(Thread thread) {
177 return true;
178 }
179
180
181
182
183 private static final class FreezableTicker implements MockTicker {
184 private final Ticker unfrozen = Ticker.systemTicker();
185 private final Lock lock = new ReentrantLock();
186 private final Condition cond = lock.newCondition();
187
188
189
190
191 private long startTime;
192
193
194
195 private long frozenTimestamp;
196
197
198
199 private boolean timeFrozen;
200
201 @Override
202 public void advance(long amount, TimeUnit unit) {
203 lock.lock();
204 try {
205 long nanos = unit.toNanos(amount);
206 if (timeFrozen) {
207 frozenTimestamp += nanos;
208 } else {
209
210
211 startTime -= nanos;
212 }
213 cond.signalAll();
214 } finally {
215 lock.unlock();
216 }
217 }
218
219 @Override
220 public long nanoTime() {
221 lock.lock();
222 try {
223 if (timeFrozen) {
224 return frozenTimestamp;
225 }
226 return unfrozen.nanoTime() - startTime;
227 } finally {
228 lock.unlock();
229 }
230 }
231
232 @Override
233 public void sleep(long delay, TimeUnit unit) throws InterruptedException {
234 long deadline = nanoTime() + unit.toNanos(delay);
235 lock.lockInterruptibly();
236 try {
237 while (true) {
238 long timeout = deadline - nanoTime();
239 if (timeout < 0) {
240 break;
241 }
242 if (timeFrozen) {
243 cond.await();
244 } else {
245 cond.awaitNanos(timeout);
246 }
247 }
248 } finally {
249 lock.unlock();
250 }
251 }
252
253 public void freezeTime() {
254 lock.lock();
255 try {
256 if (!timeFrozen) {
257 frozenTimestamp = nanoTime();
258 timeFrozen = true;
259 }
260 } finally {
261 lock.unlock();
262 }
263 }
264
265 public void unfreezeTime() {
266 lock.lock();
267 try {
268 if (timeFrozen) {
269
270
271
272 startTime = unfrozen.nanoTime() - frozenTimestamp;
273 timeFrozen = false;
274 }
275 } finally {
276 lock.unlock();
277 }
278 }
279 }
280 }