1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.embedded;
17
18 import io.netty.channel.Channel;
19 import io.netty.channel.ChannelFuture;
20 import io.netty.channel.ChannelPromise;
21 import io.netty.channel.DefaultChannelPromise;
22 import io.netty.channel.EventLoop;
23 import io.netty.channel.EventLoopGroup;
24 import io.netty.util.concurrent.AbstractScheduledEventExecutor;
25 import io.netty.util.concurrent.Future;
26 import io.netty.util.concurrent.MockTicker;
27 import io.netty.util.concurrent.Ticker;
28 import io.netty.util.internal.ObjectUtil;
29
30 import java.util.ArrayDeque;
31 import java.util.Queue;
32 import java.util.concurrent.TimeUnit;
33
34 final class EmbeddedEventLoop extends AbstractScheduledEventExecutor implements EventLoop {
35 private final Ticker ticker;
36
37 private final Queue<Runnable> tasks = new ArrayDeque<Runnable>(2);
38
39 EmbeddedEventLoop(Ticker ticker) {
40 this.ticker = ticker;
41 }
42
43 @Override
44 public EventLoopGroup parent() {
45 return (EventLoopGroup) super.parent();
46 }
47
48 @Override
49 public EventLoop next() {
50 return (EventLoop) super.next();
51 }
52
53 @Override
54 public void execute(Runnable command) {
55 tasks.add(ObjectUtil.checkNotNull(command, "command"));
56 }
57
58 void runTasks() {
59 for (;;) {
60 Runnable task = tasks.poll();
61 if (task == null) {
62 break;
63 }
64
65 task.run();
66 }
67 }
68
69 boolean hasPendingNormalTasks() {
70 return !tasks.isEmpty();
71 }
72
73 long runScheduledTasks() {
74 long time = getCurrentTimeNanos();
75 for (;;) {
76 Runnable task = pollScheduledTask(time);
77 if (task == null) {
78 return nextScheduledTaskNano();
79 }
80
81 task.run();
82 }
83 }
84
85 long nextScheduledTask() {
86 return nextScheduledTaskNano();
87 }
88
89 @Override
90 public Ticker ticker() {
91 return ticker;
92 }
93
94 @Override
95 protected long getCurrentTimeNanos() {
96 return ticker.nanoTime();
97 }
98
99 @Override
100 protected void cancelScheduledTasks() {
101 super.cancelScheduledTasks();
102 }
103
104 @Override
105 public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
106 throw new UnsupportedOperationException();
107 }
108
109 @Override
110 public Future<?> terminationFuture() {
111 throw new UnsupportedOperationException();
112 }
113
114 @Override
115 @Deprecated
116 public void shutdown() {
117 throw new UnsupportedOperationException();
118 }
119
120 @Override
121 public boolean isShuttingDown() {
122 return false;
123 }
124
125 @Override
126 public boolean isShutdown() {
127 return false;
128 }
129
130 @Override
131 public boolean isTerminated() {
132 return false;
133 }
134
135 @Override
136 public boolean awaitTermination(long timeout, TimeUnit unit) {
137 return false;
138 }
139
140 @Override
141 public ChannelFuture register(Channel channel) {
142 return register(new DefaultChannelPromise(channel, this));
143 }
144
145 @Override
146 public ChannelFuture register(ChannelPromise promise) {
147 ObjectUtil.checkNotNull(promise, "promise");
148 promise.channel().unsafe().register(this, promise);
149 return promise;
150 }
151
152 @Deprecated
153 @Override
154 public ChannelFuture register(Channel channel, ChannelPromise promise) {
155 channel.unsafe().register(this, promise);
156 return promise;
157 }
158
159 @Override
160 public boolean inEventLoop() {
161 return true;
162 }
163
164 @Override
165 public boolean inEventLoop(Thread thread) {
166 return true;
167 }
168
169
170
171
172 static final class FreezableTicker implements MockTicker {
173 private final Ticker unfrozen = Ticker.systemTicker();
174
175
176
177
178 private long startTime;
179
180
181
182 private long frozenTimestamp;
183
184
185
186 private boolean timeFrozen;
187
188 @Override
189 public void advance(long amount, TimeUnit unit) {
190 long nanos = unit.toNanos(amount);
191 if (timeFrozen) {
192 frozenTimestamp += nanos;
193 } else {
194
195
196 startTime -= nanos;
197 }
198 }
199
200 @Override
201 public long nanoTime() {
202 if (timeFrozen) {
203 return frozenTimestamp;
204 }
205 return unfrozen.nanoTime() - startTime;
206 }
207
208 @Override
209 public void sleep(long delay, TimeUnit unit) throws InterruptedException {
210 throw new UnsupportedOperationException("Sleeping is not supported by the default ticker for " +
211 "EmbeddedEventLoop. Please use a different ticker implementation if you require sleep support.");
212 }
213
214 public void freezeTime() {
215 if (!timeFrozen) {
216 frozenTimestamp = nanoTime();
217 timeFrozen = true;
218 }
219 }
220
221 public void unfreezeTime() {
222 if (timeFrozen) {
223
224
225
226 startTime = unfrozen.nanoTime() - frozenTimestamp;
227 timeFrozen = false;
228 }
229 }
230 }
231 }