View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.embedded;
17  
18  import io.netty.channel.Channel;
19  import io.netty.channel.ChannelFuture;
20  import io.netty.channel.ChannelPromise;
21  import io.netty.channel.DefaultChannelPromise;
22  import io.netty.channel.EventLoop;
23  import io.netty.channel.EventLoopGroup;
24  import io.netty.util.concurrent.AbstractScheduledEventExecutor;
25  import io.netty.util.concurrent.Future;
26  import io.netty.util.concurrent.MockTicker;
27  import io.netty.util.concurrent.Ticker;
28  import io.netty.util.internal.ObjectUtil;
29  
30  import java.util.ArrayDeque;
31  import java.util.Queue;
32  import java.util.concurrent.TimeUnit;
33  import java.util.concurrent.locks.Condition;
34  import java.util.concurrent.locks.Lock;
35  import java.util.concurrent.locks.ReentrantLock;
36  
37  final class EmbeddedEventLoop extends AbstractScheduledEventExecutor implements EventLoop {
38      private final FreezableTicker ticker = new FreezableTicker();
39  
40      private final Queue<Runnable> tasks = new ArrayDeque<Runnable>(2);
41  
42      @Override
43      public EventLoopGroup parent() {
44          return (EventLoopGroup) super.parent();
45      }
46  
47      @Override
48      public EventLoop next() {
49          return (EventLoop) super.next();
50      }
51  
52      @Override
53      public void execute(Runnable command) {
54          tasks.add(ObjectUtil.checkNotNull(command, "command"));
55      }
56  
57      void runTasks() {
58          for (;;) {
59              Runnable task = tasks.poll();
60              if (task == null) {
61                  break;
62              }
63  
64              task.run();
65          }
66      }
67  
68      boolean hasPendingNormalTasks() {
69          return !tasks.isEmpty();
70      }
71  
72      long runScheduledTasks() {
73          long time = getCurrentTimeNanos();
74          for (;;) {
75              Runnable task = pollScheduledTask(time);
76              if (task == null) {
77                  return nextScheduledTaskNano();
78              }
79  
80              task.run();
81          }
82      }
83  
84      long nextScheduledTask() {
85          return nextScheduledTaskNano();
86      }
87  
88      @Override
89      public Ticker ticker() {
90          return ticker;
91      }
92  
93      @Override
94      protected long getCurrentTimeNanos() {
95          return ticker.nanoTime();
96      }
97  
98      void advanceTimeBy(long nanos) {
99          ticker.advance(nanos, TimeUnit.NANOSECONDS);
100     }
101 
102     void freezeTime() {
103         ticker.freezeTime();
104     }
105 
106     void unfreezeTime() {
107         ticker.unfreezeTime();
108     }
109 
110     @Override
111     protected void cancelScheduledTasks() {
112         super.cancelScheduledTasks();
113     }
114 
115     @Override
116     public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
117         throw new UnsupportedOperationException();
118     }
119 
120     @Override
121     public Future<?> terminationFuture() {
122         throw new UnsupportedOperationException();
123     }
124 
125     @Override
126     @Deprecated
127     public void shutdown() {
128         throw new UnsupportedOperationException();
129     }
130 
131     @Override
132     public boolean isShuttingDown() {
133         return false;
134     }
135 
136     @Override
137     public boolean isShutdown() {
138         return false;
139     }
140 
141     @Override
142     public boolean isTerminated() {
143         return false;
144     }
145 
146     @Override
147     public boolean awaitTermination(long timeout, TimeUnit unit) {
148         return false;
149     }
150 
151     @Override
152     public ChannelFuture register(Channel channel) {
153         return register(new DefaultChannelPromise(channel, this));
154     }
155 
156     @Override
157     public ChannelFuture register(ChannelPromise promise) {
158         ObjectUtil.checkNotNull(promise, "promise");
159         promise.channel().unsafe().register(this, promise);
160         return promise;
161     }
162 
163     @Deprecated
164     @Override
165     public ChannelFuture register(Channel channel, ChannelPromise promise) {
166         channel.unsafe().register(this, promise);
167         return promise;
168     }
169 
170     @Override
171     public boolean inEventLoop() {
172         return true;
173     }
174 
175     @Override
176     public boolean inEventLoop(Thread thread) {
177         return true;
178     }
179 
180     /**
181      * Ticker that implements the old {@link EmbeddedChannel} time freezing mechanics.
182      */
183     private static final class FreezableTicker implements MockTicker {
184         private final Ticker unfrozen = Ticker.systemTicker();
185         private final Lock lock = new ReentrantLock();
186         private final Condition cond = lock.newCondition();
187         /**
188          * When time is not {@link #timeFrozen frozen}, the base time to subtract from {@link System#nanoTime()}. When
189          * time is frozen, this variable is unused.
190          */
191         private long startTime;
192         /**
193          * When time is frozen, the timestamp returned by {@link #getCurrentTimeNanos()}. When unfrozen, this is unused.
194          */
195         private long frozenTimestamp;
196         /**
197          * Whether time is currently frozen.
198          */
199         private boolean timeFrozen;
200 
201         @Override
202         public void advance(long amount, TimeUnit unit) {
203             lock.lock();
204             try {
205                 long nanos = unit.toNanos(amount);
206                 if (timeFrozen) {
207                     frozenTimestamp += nanos;
208                 } else {
209                     // startTime is subtracted from nanoTime, so increasing the startTime will advance
210                     // getCurrentTimeNanos
211                     startTime -= nanos;
212                 }
213                 cond.signalAll();
214             } finally {
215                 lock.unlock();
216             }
217         }
218 
219         @Override
220         public long nanoTime() {
221             lock.lock();
222             try {
223                 if (timeFrozen) {
224                     return frozenTimestamp;
225                 }
226                 return unfrozen.nanoTime() - startTime;
227             } finally {
228                 lock.unlock();
229             }
230         }
231 
232         @Override
233         public void sleep(long delay, TimeUnit unit) throws InterruptedException {
234             long deadline = nanoTime() + unit.toNanos(delay);
235             lock.lockInterruptibly();
236             try {
237                 while (true) {
238                     long timeout = deadline - nanoTime();
239                     if (timeout < 0) {
240                         break;
241                     }
242                     if (timeFrozen) {
243                         cond.await();
244                     } else {
245                         cond.awaitNanos(timeout);
246                     }
247                 }
248             } finally {
249                 lock.unlock();
250             }
251         }
252 
253         public void freezeTime() {
254             lock.lock();
255             try {
256                 if (!timeFrozen) {
257                     frozenTimestamp = nanoTime();
258                     timeFrozen = true;
259                 }
260             } finally {
261                 lock.unlock();
262             }
263         }
264 
265         public void unfreezeTime() {
266             lock.lock();
267             try {
268                 if (timeFrozen) {
269                     // we want getCurrentTimeNanos to continue right where frozenTimestamp left off:
270                     // nanoTime = unfrozen.nanoTime - startTime = frozenTimestamp
271                     // then solve for startTime
272                     startTime = unfrozen.nanoTime() - frozenTimestamp;
273                     timeFrozen = false;
274                 }
275             } finally {
276                 lock.unlock();
277             }
278         }
279     }
280 }