View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.embedded;
17  
18  import io.netty.channel.AbstractChannel;
19  import io.netty.channel.Channel;
20  import io.netty.channel.ChannelConfig;
21  import io.netty.channel.ChannelFuture;
22  import io.netty.channel.ChannelHandler;
23  import io.netty.channel.ChannelHandlerAdapter;
24  import io.netty.channel.ChannelHandlerContext;
25  import io.netty.channel.ChannelMetadata;
26  import io.netty.channel.ChannelOutboundBuffer;
27  import io.netty.channel.ChannelPipeline;
28  import io.netty.channel.ChannelPromise;
29  import io.netty.channel.DefaultChannelConfig;
30  import io.netty.channel.EventLoop;
31  import io.netty.util.ReferenceCountUtil;
32  import io.netty.util.internal.PlatformDependent;
33  import io.netty.util.internal.RecyclableArrayList;
34  import io.netty.util.internal.logging.InternalLogger;
35  import io.netty.util.internal.logging.InternalLoggerFactory;
36  
37  import java.net.SocketAddress;
38  import java.nio.channels.ClosedChannelException;
39  import java.util.ArrayDeque;
40  import java.util.Queue;
41  
42  /**
43   * Base class for {@link Channel} implementations that are used in an embedded fashion.
44   */
45  public class EmbeddedChannel extends AbstractChannel {
46  
47      private static final SocketAddress LOCAL_ADDRESS = new EmbeddedSocketAddress();
48      private static final SocketAddress REMOTE_ADDRESS = new EmbeddedSocketAddress();
49  
50      private static final ChannelHandler[] EMPTY_HANDLERS = new ChannelHandler[0];
51      private enum State { OPEN, ACTIVE, CLOSED }
52  
53      private static final InternalLogger logger = InternalLoggerFactory.getInstance(EmbeddedChannel.class);
54  
55      private static final ChannelMetadata METADATA = new ChannelMetadata(false);
56  
57      private final EmbeddedEventLoop loop = new EmbeddedEventLoop();
58      private final ChannelConfig config = new DefaultChannelConfig(this);
59      private final Queue<Object> inboundMessages = new ArrayDeque<Object>();
60      private final Queue<Object> outboundMessages = new ArrayDeque<Object>();
61      private Throwable lastException;
62      private State state;
63  
64      /**
65       * Create a new instance with an empty pipeline.
66       */
67      public EmbeddedChannel() {
68          this(EMPTY_HANDLERS);
69      }
70  
71      /**
72       * Create a new instance with the pipeline initialized with the specified handlers.
73       *
74       * @param handlers the @link ChannelHandler}s which will be add in the {@link ChannelPipeline}
75       */
76      public EmbeddedChannel(ChannelHandler... handlers) {
77          super(null, EmbeddedChannelId.INSTANCE);
78  
79          if (handlers == null) {
80              throw new NullPointerException("handlers");
81          }
82  
83          ChannelPipeline p = pipeline();
84          for (ChannelHandler h: handlers) {
85              if (h == null) {
86                  break;
87              }
88              p.addLast(h);
89          }
90  
91          loop.register(this);
92          p.addLast(new LastInboundHandler());
93      }
94  
95      @Override
96      public ChannelMetadata metadata() {
97          return METADATA;
98      }
99  
100     @Override
101     public ChannelConfig config() {
102         return config;
103     }
104 
105     @Override
106     public boolean isOpen() {
107         return state != State.CLOSED;
108     }
109 
110     @Override
111     public boolean isActive() {
112         return state == State.ACTIVE;
113     }
114 
115     /**
116      * Returns the {@link Queue} which holds all the {@link Object}s that were received by this {@link Channel}.
117      */
118     public Queue<Object> inboundMessages() {
119         return inboundMessages;
120     }
121 
122     /**
123      * @deprecated use {@link #inboundMessages()}
124      */
125     @Deprecated
126     public Queue<Object> lastInboundBuffer() {
127         return inboundMessages();
128     }
129 
130     /**
131      * Returns the {@link Queue} which holds all the {@link Object}s that were written by this {@link Channel}.
132      */
133     public Queue<Object> outboundMessages() {
134         return outboundMessages;
135     }
136 
137     /**
138      * @deprecated use {@link #outboundMessages()}
139      */
140     @Deprecated
141     public Queue<Object> lastOutboundBuffer() {
142         return outboundMessages();
143     }
144 
145     /**
146      * Return received data from this {@link Channel}
147      */
148     @SuppressWarnings("unchecked")
149     public <T> T readInbound() {
150         return (T) inboundMessages.poll();
151     }
152 
153     /**
154      * Read data froum the outbound. This may return {@code null} if nothing is readable.
155      */
156     @SuppressWarnings("unchecked")
157     public <T> T readOutbound() {
158         return (T) outboundMessages.poll();
159     }
160 
161     /**
162      * Write messages to the inbound of this {@link Channel}.
163      *
164      * @param msgs the messages to be written
165      *
166      * @return {@code true} if the write operation did add something to the inbound buffer
167      */
168     public boolean writeInbound(Object... msgs) {
169         ensureOpen();
170         if (msgs.length == 0) {
171             return !inboundMessages.isEmpty();
172         }
173 
174         ChannelPipeline p = pipeline();
175         for (Object m: msgs) {
176             p.fireChannelRead(m);
177         }
178         p.fireChannelReadComplete();
179         runPendingTasks();
180         checkException();
181         return !inboundMessages.isEmpty();
182     }
183 
184     /**
185      * Write messages to the outbound of this {@link Channel}.
186      *
187      * @param msgs              the messages to be written
188      * @return bufferReadable   returns {@code true} if the write operation did add something to the outbound buffer
189      */
190     public boolean writeOutbound(Object... msgs) {
191         ensureOpen();
192         if (msgs.length == 0) {
193             return !outboundMessages.isEmpty();
194         }
195 
196         RecyclableArrayList futures = RecyclableArrayList.newInstance(msgs.length);
197         try {
198             for (Object m: msgs) {
199                 if (m == null) {
200                     break;
201                 }
202                 futures.add(write(m));
203             }
204 
205             flush();
206 
207             int size = futures.size();
208             for (int i = 0; i < size; i++) {
209                 ChannelFuture future = (ChannelFuture) futures.get(i);
210                 assert future.isDone();
211                 if (future.cause() != null) {
212                     recordException(future.cause());
213                 }
214             }
215 
216             runPendingTasks();
217             checkException();
218             return !outboundMessages.isEmpty();
219         } finally {
220             futures.recycle();
221         }
222     }
223 
224     /**
225      * Mark this {@link Channel} as finished. Any futher try to write data to it will fail.
226      *
227      * @return bufferReadable returns {@code true} if any of the used buffers has something left to read
228      */
229     public boolean finish() {
230         close();
231         runPendingTasks();
232 
233         // Cancel all scheduled tasks that are left.
234         loop.cancelScheduledTasks();
235 
236         checkException();
237 
238         return !inboundMessages.isEmpty() || !outboundMessages.isEmpty();
239     }
240 
241     /**
242      * Run all tasks (which also includes scheduled tasks) that are pending in the {@link EventLoop}
243      * for this {@link Channel}
244      */
245     public void runPendingTasks() {
246         try {
247             loop.runTasks();
248         } catch (Exception e) {
249             recordException(e);
250         }
251 
252         try {
253             loop.runScheduledTasks();
254         } catch (Exception e) {
255             recordException(e);
256         }
257     }
258 
259     /**
260      * Run all pending scheduled tasks in the {@link EventLoop} for this {@link Channel} and return the
261      * {@code nanoseconds} when the next scheduled task is ready to run. If no other task was scheduled it will return
262      * {@code -1}.
263      */
264     public long runScheduledPendingTasks() {
265         try {
266             return loop.runScheduledTasks();
267         } catch (Exception e) {
268             recordException(e);
269             return loop.nextScheduledTask();
270         }
271     }
272 
273     private void recordException(Throwable cause) {
274         if (lastException == null) {
275             lastException = cause;
276         } else {
277             logger.warn(
278                     "More than one exception was raised. " +
279                             "Will report only the first one and log others.", cause);
280         }
281     }
282 
283     /**
284      * Check if there was any {@link Throwable} received and if so rethrow it.
285      */
286     public void checkException() {
287         Throwable t = lastException;
288         if (t == null) {
289             return;
290         }
291 
292         lastException = null;
293 
294         PlatformDependent.throwException(t);
295     }
296 
297     /**
298      * Ensure the {@link Channel} is open and of not throw an exception.
299      */
300     protected final void ensureOpen() {
301         if (!isOpen()) {
302             recordException(new ClosedChannelException());
303             checkException();
304         }
305     }
306 
307     @Override
308     protected boolean isCompatible(EventLoop loop) {
309         return loop instanceof EmbeddedEventLoop;
310     }
311 
312     @Override
313     protected SocketAddress localAddress0() {
314         return isActive()? LOCAL_ADDRESS : null;
315     }
316 
317     @Override
318     protected SocketAddress remoteAddress0() {
319         return isActive()? REMOTE_ADDRESS : null;
320     }
321 
322     @Override
323     protected void doRegister() throws Exception {
324         state = State.ACTIVE;
325     }
326 
327     @Override
328     protected void doBind(SocketAddress localAddress) throws Exception {
329         // NOOP
330     }
331 
332     @Override
333     protected void doDisconnect() throws Exception {
334         doClose();
335     }
336 
337     @Override
338     protected void doClose() throws Exception {
339         state = State.CLOSED;
340     }
341 
342     @Override
343     protected void doBeginRead() throws Exception {
344         // NOOP
345     }
346 
347     @Override
348     protected AbstractUnsafe newUnsafe() {
349         return new DefaultUnsafe();
350     }
351 
352     @Override
353     protected void doWrite(ChannelOutboundBuffer in) throws Exception {
354         for (;;) {
355             Object msg = in.current();
356             if (msg == null) {
357                 break;
358             }
359 
360             ReferenceCountUtil.retain(msg);
361             outboundMessages.add(msg);
362             in.remove();
363         }
364     }
365 
366     private class DefaultUnsafe extends AbstractUnsafe {
367         @Override
368         public void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
369             safeSetSuccess(promise);
370         }
371     }
372 
373     private final class LastInboundHandler extends ChannelHandlerAdapter {
374         @Override
375         public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
376             inboundMessages.add(msg);
377         }
378 
379         @Override
380         public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
381             recordException(cause);
382         }
383     }
384 }