View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.embedded;
17  
18  import io.netty.channel.Channel;
19  import io.netty.channel.ChannelFuture;
20  import io.netty.channel.ChannelHandlerContext;
21  import io.netty.channel.ChannelHandlerInvoker;
22  import io.netty.channel.ChannelPromise;
23  import io.netty.channel.DefaultChannelPromise;
24  import io.netty.util.concurrent.EventExecutor;
25  import io.netty.channel.EventLoop;
26  import io.netty.channel.EventLoopGroup;
27  import io.netty.util.concurrent.AbstractScheduledEventExecutor;
28  import io.netty.util.concurrent.Future;
29  
30  import java.net.SocketAddress;
31  import java.util.ArrayDeque;
32  import java.util.Queue;
33  import java.util.concurrent.TimeUnit;
34  
35  import static io.netty.channel.ChannelHandlerInvokerUtil.*;
36  
37  final class EmbeddedEventLoop extends AbstractScheduledEventExecutor implements ChannelHandlerInvoker, EventLoop {
38  
39      private final Queue<Runnable> tasks = new ArrayDeque<Runnable>(2);
40  
41      @Override
42      public EventLoop unwrap() {
43          return this;
44      }
45  
46      @Override
47      public EventLoopGroup parent() {
48          return (EventLoopGroup) super.parent();
49      }
50  
51      @Override
52      public EventLoop next() {
53          return (EventLoop) super.next();
54      }
55  
56      @Override
57      public void execute(Runnable command) {
58          if (command == null) {
59              throw new NullPointerException("command");
60          }
61          tasks.add(command);
62      }
63  
64      void runTasks() {
65          for (;;) {
66              Runnable task = tasks.poll();
67              if (task == null) {
68                  break;
69              }
70  
71              task.run();
72          }
73      }
74  
75      long runScheduledTasks() {
76          long time = AbstractScheduledEventExecutor.nanoTime();
77          for (;;) {
78              Runnable task = pollScheduledTask(time);
79              if (task == null) {
80                  return nextScheduledTaskNano();
81              }
82  
83              task.run();
84          }
85      }
86  
87      long nextScheduledTask() {
88          return nextScheduledTaskNano();
89      }
90  
91      @Override
92      protected void cancelScheduledTasks() {
93          super.cancelScheduledTasks();
94      }
95  
96      @Override
97      public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
98          throw new UnsupportedOperationException();
99      }
100 
101     @Override
102     public Future<?> terminationFuture() {
103         throw new UnsupportedOperationException();
104     }
105 
106     @Override
107     @Deprecated
108     public void shutdown() {
109         throw new UnsupportedOperationException();
110     }
111 
112     @Override
113     public boolean isShuttingDown() {
114         return false;
115     }
116 
117     @Override
118     public boolean isShutdown() {
119         return false;
120     }
121 
122     @Override
123     public boolean isTerminated() {
124         return false;
125     }
126 
127     @Override
128     public boolean awaitTermination(long timeout, TimeUnit unit) {
129         return false;
130     }
131 
132     @Override
133     public ChannelFuture register(Channel channel) {
134         return register(channel, new DefaultChannelPromise(channel, this));
135     }
136 
137     @Override
138     public ChannelFuture register(Channel channel, ChannelPromise promise) {
139         channel.unsafe().register(this, promise);
140         return promise;
141     }
142 
143     @Override
144     public boolean inEventLoop() {
145         return true;
146     }
147 
148     @Override
149     public boolean inEventLoop(Thread thread) {
150         return true;
151     }
152 
153     @Override
154     public ChannelHandlerInvoker asInvoker() {
155         return this;
156     }
157 
158     @Override
159     public EventExecutor executor() {
160         return this;
161     }
162 
163     @Override
164     public void invokeChannelRegistered(ChannelHandlerContext ctx) {
165         invokeChannelRegisteredNow(ctx);
166     }
167 
168     @Override
169     public void invokeChannelUnregistered(ChannelHandlerContext ctx) {
170         invokeChannelUnregisteredNow(ctx);
171     }
172 
173     @Override
174     public void invokeChannelActive(ChannelHandlerContext ctx) {
175         invokeChannelActiveNow(ctx);
176     }
177 
178     @Override
179     public void invokeChannelInactive(ChannelHandlerContext ctx) {
180         invokeChannelInactiveNow(ctx);
181     }
182 
183     @Override
184     public void invokeExceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
185         invokeExceptionCaughtNow(ctx, cause);
186     }
187 
188     @Override
189     public void invokeUserEventTriggered(ChannelHandlerContext ctx, Object event) {
190         invokeUserEventTriggeredNow(ctx, event);
191     }
192 
193     @Override
194     public void invokeChannelRead(ChannelHandlerContext ctx, Object msg) {
195         invokeChannelReadNow(ctx, msg);
196     }
197 
198     @Override
199     public void invokeChannelReadComplete(ChannelHandlerContext ctx) {
200         invokeChannelReadCompleteNow(ctx);
201     }
202 
203     @Override
204     public void invokeChannelWritabilityChanged(ChannelHandlerContext ctx) {
205         invokeChannelWritabilityChangedNow(ctx);
206     }
207 
208     @Override
209     public void invokeBind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
210         invokeBindNow(ctx, localAddress, promise);
211     }
212 
213     @Override
214     public void invokeConnect(
215             ChannelHandlerContext ctx,
216             SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
217         invokeConnectNow(ctx, remoteAddress, localAddress, promise);
218     }
219 
220     @Override
221     public void invokeDisconnect(ChannelHandlerContext ctx, ChannelPromise promise) {
222         invokeDisconnectNow(ctx, promise);
223     }
224 
225     @Override
226     public void invokeClose(ChannelHandlerContext ctx, ChannelPromise promise) {
227         invokeCloseNow(ctx, promise);
228     }
229 
230     @Override
231     public void invokeDeregister(ChannelHandlerContext ctx, final ChannelPromise promise) {
232         invokeDeregisterNow(ctx, promise);
233     }
234 
235     @Override
236     public void invokeRead(ChannelHandlerContext ctx) {
237         invokeReadNow(ctx);
238     }
239 
240     @Override
241     public void invokeWrite(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
242         invokeWriteNow(ctx, msg, promise);
243     }
244 
245     @Override
246     public void invokeFlush(ChannelHandlerContext ctx) {
247         invokeFlushNow(ctx);
248     }
249 }