View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.embedded;
17  
18  import io.netty.channel.Channel;
19  import io.netty.channel.ChannelFuture;
20  import io.netty.channel.ChannelPromise;
21  import io.netty.channel.DefaultChannelPromise;
22  import io.netty.channel.EventLoop;
23  import io.netty.channel.EventLoopGroup;
24  import io.netty.util.concurrent.AbstractScheduledEventExecutor;
25  import io.netty.util.concurrent.Future;
26  import io.netty.util.concurrent.MockTicker;
27  import io.netty.util.concurrent.Ticker;
28  import io.netty.util.internal.ObjectUtil;
29  
30  import java.util.ArrayDeque;
31  import java.util.Queue;
32  import java.util.concurrent.TimeUnit;
33  
34  final class EmbeddedEventLoop extends AbstractScheduledEventExecutor implements EventLoop {
35      private final Ticker ticker;
36  
37      private final Queue<Runnable> tasks = new ArrayDeque<Runnable>(2);
38  
39      EmbeddedEventLoop(Ticker ticker) {
40          this.ticker = ticker;
41      }
42  
43      @Override
44      public EventLoopGroup parent() {
45          return (EventLoopGroup) super.parent();
46      }
47  
48      @Override
49      public EventLoop next() {
50          return (EventLoop) super.next();
51      }
52  
53      @Override
54      public void execute(Runnable command) {
55          tasks.add(ObjectUtil.checkNotNull(command, "command"));
56      }
57  
58      void runTasks() {
59          for (;;) {
60              Runnable task = tasks.poll();
61              if (task == null) {
62                  break;
63              }
64  
65              task.run();
66          }
67      }
68  
69      boolean hasPendingNormalTasks() {
70          return !tasks.isEmpty();
71      }
72  
73      long runScheduledTasks() {
74          long time = getCurrentTimeNanos();
75          for (;;) {
76              Runnable task = pollScheduledTask(time);
77              if (task == null) {
78                  return nextScheduledTaskNano();
79              }
80  
81              task.run();
82          }
83      }
84  
85      long nextScheduledTask() {
86          return nextScheduledTaskNano();
87      }
88  
89      @Override
90      public Ticker ticker() {
91          return ticker;
92      }
93  
94      @Override
95      protected long getCurrentTimeNanos() {
96          return ticker.nanoTime();
97      }
98  
99      @Override
100     protected void cancelScheduledTasks() {
101         super.cancelScheduledTasks();
102     }
103 
104     @Override
105     public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
106         throw new UnsupportedOperationException();
107     }
108 
109     @Override
110     public Future<?> terminationFuture() {
111         throw new UnsupportedOperationException();
112     }
113 
114     @Override
115     @Deprecated
116     public void shutdown() {
117         throw new UnsupportedOperationException();
118     }
119 
120     @Override
121     public boolean isShuttingDown() {
122         return false;
123     }
124 
125     @Override
126     public boolean isShutdown() {
127         return false;
128     }
129 
130     @Override
131     public boolean isTerminated() {
132         return false;
133     }
134 
135     @Override
136     public boolean awaitTermination(long timeout, TimeUnit unit) {
137         return false;
138     }
139 
140     @Override
141     public ChannelFuture register(Channel channel) {
142         return register(new DefaultChannelPromise(channel, this));
143     }
144 
145     @Override
146     public ChannelFuture register(ChannelPromise promise) {
147         ObjectUtil.checkNotNull(promise, "promise");
148         promise.channel().unsafe().register(this, promise);
149         return promise;
150     }
151 
152     @Deprecated
153     @Override
154     public ChannelFuture register(Channel channel, ChannelPromise promise) {
155         channel.unsafe().register(this, promise);
156         return promise;
157     }
158 
159     @Override
160     public boolean inEventLoop() {
161         return true;
162     }
163 
164     @Override
165     public boolean inEventLoop(Thread thread) {
166         return true;
167     }
168 
169     /**
170      * Ticker that implements the old {@link EmbeddedChannel} time freezing mechanics.
171      */
172     static final class FreezableTicker implements MockTicker {
173         private final Ticker unfrozen = Ticker.systemTicker();
174         /**
175          * When time is not {@link #timeFrozen frozen}, the base time to subtract from {@link System#nanoTime()}. When
176          * time is frozen, this variable is unused.
177          */
178         private long startTime;
179         /**
180          * When time is frozen, the timestamp returned by {@link #getCurrentTimeNanos()}. When unfrozen, this is unused.
181          */
182         private long frozenTimestamp;
183         /**
184          * Whether time is currently frozen.
185          */
186         private boolean timeFrozen;
187 
188         @Override
189         public void advance(long amount, TimeUnit unit) {
190             long nanos = unit.toNanos(amount);
191             if (timeFrozen) {
192                 frozenTimestamp += nanos;
193             } else {
194                 // startTime is subtracted from nanoTime, so increasing the startTime will advance
195                 // getCurrentTimeNanos
196                 startTime -= nanos;
197             }
198         }
199 
200         @Override
201         public long nanoTime() {
202             if (timeFrozen) {
203                 return frozenTimestamp;
204             }
205             return unfrozen.nanoTime() - startTime;
206         }
207 
208         @Override
209         public void sleep(long delay, TimeUnit unit) throws InterruptedException {
210             throw new UnsupportedOperationException("Sleeping is not supported by the default ticker for " +
211                     "EmbeddedEventLoop. Please use a different ticker implementation if you require sleep support.");
212         }
213 
214         public void freezeTime() {
215             if (!timeFrozen) {
216                 frozenTimestamp = nanoTime();
217                 timeFrozen = true;
218             }
219         }
220 
221         public void unfreezeTime() {
222             if (timeFrozen) {
223                 // we want getCurrentTimeNanos to continue right where frozenTimestamp left off:
224                 // nanoTime = unfrozen.nanoTime - startTime = frozenTimestamp
225                 // then solve for startTime
226                 startTime = unfrozen.nanoTime() - frozenTimestamp;
227                 timeFrozen = false;
228             }
229         }
230     }
231 }