View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty5.channel.embedded;
17  
18  import io.netty5.channel.Channel;
19  import io.netty5.channel.EventLoop;
20  import io.netty5.channel.IoHandle;
21  import io.netty5.util.concurrent.AbstractScheduledEventExecutor;
22  import io.netty5.util.concurrent.Future;
23  import io.netty5.util.concurrent.Promise;
24  import io.netty5.util.internal.StringUtil;
25  
26  import java.util.ArrayDeque;
27  import java.util.Queue;
28  import java.util.concurrent.TimeUnit;
29  
30  import static java.util.Objects.requireNonNull;
31  
32  final class EmbeddedEventLoop extends AbstractScheduledEventExecutor implements EventLoop {
33      /*
34       * When time is not {@link #timeFrozen frozen}, the base time to subtract from {@link System#nanoTime()}. When time
35       * is frozen, this variable is unused.
36       *
37       * Initialized to {@link #initialNanoTime()} so that until one of the time mutator methods is called,
38       * {@link #getCurrentTimeNanos()} matches the default behavior.
39       */
40      private long startTime = initialNanoTime();
41      /**
42       * When time is frozen, the timestamp returned by {@link #getCurrentTimeNanos()}. When unfrozen, this is unused.
43       */
44      private long frozenTimestamp;
45      /**
46       * Whether time is currently frozen.
47       */
48      private boolean timeFrozen;
49  
50      private final Queue<Runnable> tasks = new ArrayDeque<>(2);
51      boolean running;
52  
53      private static EmbeddedChannel cast(IoHandle handle) {
54          if (handle instanceof EmbeddedChannel) {
55              return (EmbeddedChannel) handle;
56          }
57          throw new IllegalArgumentException("Channel of type " + StringUtil.simpleClassName(handle) + " not supported");
58      }
59  
60      @Override
61      public EventLoop next() {
62          return (EventLoop) super.next();
63      }
64  
65      @Override
66      public Future<Void> registerForIo(IoHandle handle) {
67          Promise<Void> promise = newPromise();
68          EmbeddedChannel channel = cast(handle);
69          if (inEventLoop()) {
70              registerForIO0(channel, promise);
71          } else {
72              execute(() -> registerForIO0(channel, promise));
73          }
74          return promise.asFuture();
75      }
76  
77      private void registerForIO0(EmbeddedChannel channel, Promise<Void> promise) {
78          assert inEventLoop();
79          try {
80              if (channel.isRegistered()) {
81                  throw new IllegalStateException("Channel already registered");
82              }
83              if (!channel.executor().inEventLoop()) {
84                  throw new IllegalStateException("Channel.executor() is not using the same Thread as this EventLoop");
85              }
86              channel.setActive();
87          } catch (Throwable cause) {
88              promise.setFailure(cause);
89              return;
90          }
91          promise.setSuccess(null);
92      }
93      @Override
94      public Future<Void> deregisterForIo(IoHandle handle) {
95          Promise<Void> promise = newPromise();
96          EmbeddedChannel channel = cast(handle);
97          if (inEventLoop()) {
98              deregisterForIO0(channel, promise);
99          } else {
100             execute(() -> deregisterForIO0(channel, promise));
101         }
102         return promise.asFuture();
103     }
104 
105     private void deregisterForIO0(Channel channel, Promise<Void> promise) {
106         try {
107             if (!channel.isRegistered()) {
108                 throw new IllegalStateException("Channel not registered");
109             }
110             if (!channel.executor().inEventLoop()) {
111                 throw new IllegalStateException("Channel.executor() is not using the same Thread as this EventLoop");
112             }
113         } catch (Throwable cause) {
114             promise.setFailure(cause);
115             return;
116         }
117         promise.setSuccess(null);
118     }
119 
120     @Override
121     public void execute(Runnable task) {
122         requireNonNull(task, "command");
123         tasks.add(task);
124         if (!running) {
125             runTasks();
126         }
127     }
128 
129     void runTasks() {
130         boolean wasRunning = running;
131         try {
132             for (;;) {
133                 running = true;
134                 Runnable task = tasks.poll();
135                 if (task == null) {
136                     break;
137                 }
138 
139                 task.run();
140             }
141         } finally {
142             if (!wasRunning) {
143                 running = false;
144             }
145         }
146     }
147 
148     boolean hasPendingNormalTasks() {
149         return !tasks.isEmpty();
150     }
151 
152     long runScheduledTasks() {
153         long time = getCurrentTimeNanos();
154         boolean wasRunning = running;
155         try {
156             for (;;) {
157                 running = true;
158                 Runnable task = pollScheduledTask(time);
159                 if (task == null) {
160                     return nextScheduledTaskNano();
161                 }
162 
163                 task.run();
164             }
165         } finally {
166             if (!wasRunning) {
167                 running = false;
168             }
169         }
170     }
171 
172     long nextScheduledTask() {
173         return nextScheduledTaskNano();
174     }
175 
176     void cancelScheduled() {
177         running = true;
178         try {
179             cancelScheduledTasks();
180         } finally {
181             running = false;
182         }
183     }
184 
185     @Override
186     protected long getCurrentTimeNanos() {
187         if (timeFrozen) {
188             return frozenTimestamp;
189         }
190         return System.nanoTime() - startTime;
191     }
192 
193     void advanceTimeBy(long nanos) {
194         if (timeFrozen) {
195             frozenTimestamp += nanos;
196         } else {
197             // startTime is subtracted from nanoTime, so increasing the startTime will advance getCurrentTimeNanos
198             startTime -= nanos;
199         }
200     }
201 
202     void freezeTime() {
203         if (!timeFrozen) {
204             frozenTimestamp = getCurrentTimeNanos();
205             timeFrozen = true;
206         }
207     }
208 
209     void unfreezeTime() {
210         if (timeFrozen) {
211             // we want getCurrentTimeNanos to continue right where frozenTimestamp left off:
212             // getCurrentTimeNanos = nanoTime - startTime = frozenTimestamp
213             // then solve for startTime
214             startTime = System.nanoTime() - frozenTimestamp;
215             timeFrozen = false;
216         }
217     }
218 
219     @Override
220     public Future<Void> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
221         throw new UnsupportedOperationException();
222     }
223 
224     @Override
225     public Future<Void> terminationFuture() {
226         throw new UnsupportedOperationException();
227     }
228 
229     @Override
230     public boolean isShuttingDown() {
231         return false;
232     }
233 
234     @Override
235     public boolean isShutdown() {
236         return false;
237     }
238 
239     @Override
240     public boolean isTerminated() {
241         return false;
242     }
243 
244     @Override
245     public boolean awaitTermination(long timeout, TimeUnit unit) {
246         return false;
247     }
248 
249     @Override
250     public boolean inEventLoop(Thread thread) {
251         return running;
252     }
253 
254     @Override
255     public boolean isCompatible(Class<? extends IoHandle> handleType) {
256         return EmbeddedChannel.class.isAssignableFrom(handleType);
257     }
258 }