View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.handler.codec.embedder;
17  
18  import org.jboss.netty.buffer.ChannelBufferFactory;
19  import org.jboss.netty.channel.Channel;
20  import org.jboss.netty.channel.ChannelEvent;
21  import org.jboss.netty.channel.ChannelFuture;
22  import org.jboss.netty.channel.ChannelHandler;
23  import org.jboss.netty.channel.ChannelHandlerContext;
24  import org.jboss.netty.channel.ChannelPipeline;
25  import org.jboss.netty.channel.ChannelPipelineException;
26  import org.jboss.netty.channel.ChannelSink;
27  import org.jboss.netty.channel.ChannelUpstreamHandler;
28  import org.jboss.netty.channel.DefaultChannelPipeline;
29  import org.jboss.netty.channel.ExceptionEvent;
30  import org.jboss.netty.channel.MessageEvent;
31  
32  import java.lang.reflect.Array;
33  import java.util.ConcurrentModificationException;
34  import java.util.LinkedList;
35  import java.util.Queue;
36  
37  import static org.jboss.netty.channel.Channels.*;
38  
39  /**
40   * A skeletal {@link CodecEmbedder} implementation.
41   */
42  abstract class AbstractCodecEmbedder<E> implements CodecEmbedder<E> {
43  
44      private final Channel channel;
45      private final ChannelPipeline pipeline;
46      private final EmbeddedChannelSink sink = new EmbeddedChannelSink();
47  
48      final Queue<Object> productQueue = new LinkedList<Object>();
49  
50      /**
51       * Creates a new embedder whose pipeline is composed of the specified
52       * handlers.
53       */
54      protected AbstractCodecEmbedder(ChannelHandler... handlers) {
55          pipeline = new EmbeddedChannelPipeline();
56          configurePipeline(handlers);
57          channel = new EmbeddedChannel(pipeline, sink);
58          fireInitialEvents();
59      }
60  
61      /**
62       * Creates a new embedder whose pipeline is composed of the specified
63       * handlers.
64       *
65       * @param bufferFactory the {@link ChannelBufferFactory} to be used when
66       *                      creating a new buffer.
67       */
68      protected AbstractCodecEmbedder(ChannelBufferFactory bufferFactory, ChannelHandler... handlers) {
69          this(handlers);
70          getChannel().getConfig().setBufferFactory(bufferFactory);
71      }
72  
73      private void fireInitialEvents() {
74          // Fire the typical initial events.
75          fireChannelOpen(channel);
76          fireChannelBound(channel, channel.getLocalAddress());
77          fireChannelConnected(channel, channel.getRemoteAddress());
78      }
79  
80      private void configurePipeline(ChannelHandler... handlers) {
81          if (handlers == null) {
82              throw new NullPointerException("handlers");
83          }
84  
85          if (handlers.length == 0) {
86              throw new IllegalArgumentException(
87                      "handlers should contain at least one " +
88                      ChannelHandler.class.getSimpleName() + '.');
89          }
90  
91          for (int i = 0; i < handlers.length; i ++) {
92              ChannelHandler h = handlers[i];
93              if (h == null) {
94                  throw new NullPointerException("handlers[" + i + ']');
95              }
96              pipeline.addLast(String.valueOf(i), handlers[i]);
97          }
98          pipeline.addLast("SINK", sink);
99      }
100 
101     public boolean finish() {
102         close(channel);
103         fireChannelDisconnected(channel);
104         fireChannelUnbound(channel);
105         fireChannelClosed(channel);
106         return !productQueue.isEmpty();
107     }
108 
109     /**
110      * Returns the virtual {@link Channel} which will be used as a mock
111      * during encoding and decoding.
112      */
113     protected final Channel getChannel() {
114         return channel;
115     }
116 
117     /**
118      * Returns {@code true} if and only if the produce queue is empty and
119      * therefore {@link #poll()} will return {@code null}.
120      */
121     protected final boolean isEmpty() {
122         return productQueue.isEmpty();
123     }
124 
125     public final E poll() {
126         return (E) productQueue.poll();
127     }
128 
129     public final E peek() {
130         return (E) productQueue.peek();
131     }
132 
133     public final Object[] pollAll() {
134         final int size = size();
135         Object[] a = new Object[size];
136         for (int i = 0; i < size; i ++) {
137             E product = poll();
138             if (product == null) {
139                 throw new ConcurrentModificationException();
140             }
141             a[i] = product;
142         }
143         return a;
144     }
145 
146     @SuppressWarnings("unchecked")
147     public final <T> T[] pollAll(T[] a) {
148         if (a == null) {
149             throw new NullPointerException("a");
150         }
151 
152         final int size = size();
153 
154         // Create a new array if the specified one is too small.
155         if (a.length < size) {
156             a = (T[]) Array.newInstance(a.getClass().getComponentType(), size);
157         }
158 
159         for (int i = 0;; i ++) {
160             T product = (T) poll();
161             if (product == null) {
162                 break;
163             }
164             a[i] = product;
165         }
166 
167         // Put the terminator if necessary.
168         if (a.length > size) {
169             a[size] = null;
170         }
171 
172         return a;
173     }
174 
175     public final int size() {
176         return productQueue.size();
177     }
178 
179     public ChannelPipeline getPipeline() {
180         return pipeline;
181     }
182 
183     private final class EmbeddedChannelSink implements ChannelSink, ChannelUpstreamHandler {
184         EmbeddedChannelSink() {
185         }
186 
187         public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) {
188             handleEvent(e);
189         }
190 
191         public void eventSunk(ChannelPipeline pipeline, ChannelEvent e) {
192             handleEvent(e);
193         }
194 
195         private void handleEvent(ChannelEvent e) {
196             if (e instanceof MessageEvent) {
197                 boolean offered = productQueue.offer(((MessageEvent) e).getMessage());
198                 assert offered;
199             } else if (e instanceof ExceptionEvent) {
200                 throw new CodecEmbedderException(((ExceptionEvent) e).getCause());
201             }
202 
203             // Swallow otherwise.
204         }
205 
206         public void exceptionCaught(
207                 ChannelPipeline pipeline, ChannelEvent e,
208                 ChannelPipelineException cause) throws Exception {
209             Throwable actualCause = cause.getCause();
210             if (actualCause == null) {
211                 actualCause = cause;
212             }
213 
214             throw new CodecEmbedderException(actualCause);
215         }
216 
217         public ChannelFuture execute(ChannelPipeline pipeline, Runnable task) {
218             try {
219                 task.run();
220                 return succeededFuture(pipeline.getChannel());
221             } catch (Throwable t) {
222                 return failedFuture(pipeline.getChannel(), t);
223             }
224         }
225     }
226 
227     private static final class EmbeddedChannelPipeline extends DefaultChannelPipeline {
228 
229         EmbeddedChannelPipeline() {
230         }
231 
232         @Override
233         protected void notifyHandlerException(ChannelEvent e, Throwable t) {
234             while (t instanceof ChannelPipelineException && t.getCause() != null) {
235                 t = t.getCause();
236             }
237             if (t instanceof CodecEmbedderException) {
238                 throw (CodecEmbedderException) t;
239             } else {
240                 throw new CodecEmbedderException(t);
241             }
242         }
243     }
244 }