1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.embedded;
17
18 import io.netty.channel.Channel;
19 import io.netty.channel.ChannelFuture;
20 import io.netty.channel.ChannelPromise;
21 import io.netty.channel.DefaultChannelPromise;
22 import io.netty.channel.EventLoop;
23 import io.netty.channel.EventLoopGroup;
24 import io.netty.util.concurrent.AbstractScheduledEventExecutor;
25 import io.netty.util.concurrent.Future;
26 import io.netty.util.internal.ObjectUtil;
27
28 import java.util.ArrayDeque;
29 import java.util.Queue;
30 import java.util.concurrent.TimeUnit;
31
32 final class EmbeddedEventLoop extends AbstractScheduledEventExecutor implements EventLoop {
33
34
35
36
37
38
39
40 private long startTime = initialNanoTime();
41
42
43
44 private long frozenTimestamp;
45
46
47
48 private boolean timeFrozen;
49
50 private final Queue<Runnable> tasks = new ArrayDeque<Runnable>(2);
51
52 @Override
53 public EventLoopGroup parent() {
54 return (EventLoopGroup) super.parent();
55 }
56
57 @Override
58 public EventLoop next() {
59 return (EventLoop) super.next();
60 }
61
62 @Override
63 public void execute(Runnable command) {
64 tasks.add(ObjectUtil.checkNotNull(command, "command"));
65 }
66
67 void runTasks() {
68 for (;;) {
69 Runnable task = tasks.poll();
70 if (task == null) {
71 break;
72 }
73
74 task.run();
75 }
76 }
77
78 long runScheduledTasks() {
79 long time = getCurrentTimeNanos();
80 for (;;) {
81 Runnable task = pollScheduledTask(time);
82 if (task == null) {
83 return nextScheduledTaskNano();
84 }
85
86 task.run();
87 }
88 }
89
90 long nextScheduledTask() {
91 return nextScheduledTaskNano();
92 }
93
94 @Override
95 protected long getCurrentTimeNanos() {
96 if (timeFrozen) {
97 return frozenTimestamp;
98 }
99 return System.nanoTime() - startTime;
100 }
101
102 void advanceTimeBy(long nanos) {
103 if (timeFrozen) {
104 frozenTimestamp += nanos;
105 } else {
106
107 startTime -= nanos;
108 }
109 }
110
111 void freezeTime() {
112 if (!timeFrozen) {
113 frozenTimestamp = getCurrentTimeNanos();
114 timeFrozen = true;
115 }
116 }
117
118 void unfreezeTime() {
119 if (timeFrozen) {
120
121
122
123 startTime = System.nanoTime() - frozenTimestamp;
124 timeFrozen = false;
125 }
126 }
127
128 @Override
129 protected void cancelScheduledTasks() {
130 super.cancelScheduledTasks();
131 }
132
133 @Override
134 public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
135 throw new UnsupportedOperationException();
136 }
137
138 @Override
139 public Future<?> terminationFuture() {
140 throw new UnsupportedOperationException();
141 }
142
143 @Override
144 @Deprecated
145 public void shutdown() {
146 throw new UnsupportedOperationException();
147 }
148
149 @Override
150 public boolean isShuttingDown() {
151 return false;
152 }
153
154 @Override
155 public boolean isShutdown() {
156 return false;
157 }
158
159 @Override
160 public boolean isTerminated() {
161 return false;
162 }
163
164 @Override
165 public boolean awaitTermination(long timeout, TimeUnit unit) {
166 return false;
167 }
168
169 @Override
170 public ChannelFuture register(Channel channel) {
171 return register(new DefaultChannelPromise(channel, this));
172 }
173
174 @Override
175 public ChannelFuture register(ChannelPromise promise) {
176 ObjectUtil.checkNotNull(promise, "promise");
177 promise.channel().unsafe().register(this, promise);
178 return promise;
179 }
180
181 @Deprecated
182 @Override
183 public ChannelFuture register(Channel channel, ChannelPromise promise) {
184 channel.unsafe().register(this, promise);
185 return promise;
186 }
187
188 @Override
189 public boolean inEventLoop() {
190 return true;
191 }
192
193 @Override
194 public boolean inEventLoop(Thread thread) {
195 return true;
196 }
197 }