1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.handler.codec.embedder;
17
18 import org.jboss.netty.buffer.ChannelBufferFactory;
19 import org.jboss.netty.channel.Channel;
20 import org.jboss.netty.channel.ChannelEvent;
21 import org.jboss.netty.channel.ChannelFuture;
22 import org.jboss.netty.channel.ChannelHandler;
23 import org.jboss.netty.channel.ChannelHandlerContext;
24 import org.jboss.netty.channel.ChannelPipeline;
25 import org.jboss.netty.channel.ChannelPipelineException;
26 import org.jboss.netty.channel.ChannelSink;
27 import org.jboss.netty.channel.ChannelUpstreamHandler;
28 import org.jboss.netty.channel.DefaultChannelPipeline;
29 import org.jboss.netty.channel.ExceptionEvent;
30 import org.jboss.netty.channel.MessageEvent;
31
32 import java.lang.reflect.Array;
33 import java.util.ConcurrentModificationException;
34 import java.util.LinkedList;
35 import java.util.Queue;
36
37 import static org.jboss.netty.channel.Channels.*;
38
39
40
41
42 abstract class AbstractCodecEmbedder<E> implements CodecEmbedder<E> {
43
44 private final Channel channel;
45 private final ChannelPipeline pipeline;
46 private final EmbeddedChannelSink sink = new EmbeddedChannelSink();
47
48 final Queue<Object> productQueue = new LinkedList<Object>();
49
50
51
52
53
54 protected AbstractCodecEmbedder(ChannelHandler... handlers) {
55 pipeline = new EmbeddedChannelPipeline();
56 configurePipeline(handlers);
57 channel = new EmbeddedChannel(pipeline, sink);
58 fireInitialEvents();
59 }
60
61
62
63
64
65
66
67
68 protected AbstractCodecEmbedder(ChannelBufferFactory bufferFactory, ChannelHandler... handlers) {
69 this(handlers);
70 getChannel().getConfig().setBufferFactory(bufferFactory);
71 }
72
73 private void fireInitialEvents() {
74
75 fireChannelOpen(channel);
76 fireChannelBound(channel, channel.getLocalAddress());
77 fireChannelConnected(channel, channel.getRemoteAddress());
78 }
79
80 private void configurePipeline(ChannelHandler... handlers) {
81 if (handlers == null) {
82 throw new NullPointerException("handlers");
83 }
84
85 if (handlers.length == 0) {
86 throw new IllegalArgumentException(
87 "handlers should contain at least one " +
88 ChannelHandler.class.getSimpleName() + '.');
89 }
90
91 for (int i = 0; i < handlers.length; i ++) {
92 ChannelHandler h = handlers[i];
93 if (h == null) {
94 throw new NullPointerException("handlers[" + i + ']');
95 }
96 pipeline.addLast(String.valueOf(i), handlers[i]);
97 }
98 pipeline.addLast("SINK", sink);
99 }
100
101 public boolean finish() {
102 close(channel);
103 fireChannelDisconnected(channel);
104 fireChannelUnbound(channel);
105 fireChannelClosed(channel);
106 return !productQueue.isEmpty();
107 }
108
109
110
111
112
113 protected final Channel getChannel() {
114 return channel;
115 }
116
117
118
119
120
121 protected final boolean isEmpty() {
122 return productQueue.isEmpty();
123 }
124
125 @SuppressWarnings("unchecked")
126 public final E poll() {
127 return (E) productQueue.poll();
128 }
129
130 @SuppressWarnings("unchecked")
131 public final E peek() {
132 return (E) productQueue.peek();
133 }
134
135 public final Object[] pollAll() {
136 final int size = size();
137 Object[] a = new Object[size];
138 for (int i = 0; i < size; i ++) {
139 E product = poll();
140 if (product == null) {
141 throw new ConcurrentModificationException();
142 }
143 a[i] = product;
144 }
145 return a;
146 }
147
148 @SuppressWarnings("unchecked")
149 public final <T> T[] pollAll(T[] a) {
150 if (a == null) {
151 throw new NullPointerException("a");
152 }
153
154 final int size = size();
155
156
157 if (a.length < size) {
158 a = (T[]) Array.newInstance(a.getClass().getComponentType(), size);
159 }
160
161 for (int i = 0;; i ++) {
162 T product = (T) poll();
163 if (product == null) {
164 break;
165 }
166 a[i] = product;
167 }
168
169
170 if (a.length > size) {
171 a[size] = null;
172 }
173
174 return a;
175 }
176
177 public final int size() {
178 return productQueue.size();
179 }
180
181 public ChannelPipeline getPipeline() {
182 return pipeline;
183 }
184
185 private final class EmbeddedChannelSink implements ChannelSink, ChannelUpstreamHandler {
186 EmbeddedChannelSink() {
187 }
188
189 public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) {
190 handleEvent(e);
191 }
192
193 public void eventSunk(ChannelPipeline pipeline, ChannelEvent e) {
194 handleEvent(e);
195 }
196
197 private void handleEvent(ChannelEvent e) {
198 if (e instanceof MessageEvent) {
199 boolean offered = productQueue.offer(((MessageEvent) e).getMessage());
200 assert offered;
201 } else if (e instanceof ExceptionEvent) {
202 throw new CodecEmbedderException(((ExceptionEvent) e).getCause());
203 }
204
205
206 }
207
208 public void exceptionCaught(
209 ChannelPipeline pipeline, ChannelEvent e,
210 ChannelPipelineException cause) throws Exception {
211 Throwable actualCause = cause.getCause();
212 if (actualCause == null) {
213 actualCause = cause;
214 }
215
216 throw new CodecEmbedderException(actualCause);
217 }
218
219 public ChannelFuture execute(ChannelPipeline pipeline, Runnable task) {
220 try {
221 task.run();
222 return succeededFuture(pipeline.getChannel());
223 } catch (Throwable t) {
224 return failedFuture(pipeline.getChannel(), t);
225 }
226 }
227 }
228
229 private static final class EmbeddedChannelPipeline extends DefaultChannelPipeline {
230
231 EmbeddedChannelPipeline() {
232 }
233
234 @Override
235 protected void notifyHandlerException(ChannelEvent e, Throwable t) {
236 while (t instanceof ChannelPipelineException && t.getCause() != null) {
237 t = t.getCause();
238 }
239 if (t instanceof CodecEmbedderException) {
240 throw (CodecEmbedderException) t;
241 } else {
242 throw new CodecEmbedderException(t);
243 }
244 }
245 }
246 }