1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.handler.codec.embedder;
17
18 import org.jboss.netty.buffer.ChannelBufferFactory;
19 import org.jboss.netty.channel.Channel;
20 import org.jboss.netty.channel.ChannelEvent;
21 import org.jboss.netty.channel.ChannelFuture;
22 import org.jboss.netty.channel.ChannelHandler;
23 import org.jboss.netty.channel.ChannelHandlerContext;
24 import org.jboss.netty.channel.ChannelPipeline;
25 import org.jboss.netty.channel.ChannelPipelineException;
26 import org.jboss.netty.channel.ChannelSink;
27 import org.jboss.netty.channel.ChannelUpstreamHandler;
28 import org.jboss.netty.channel.DefaultChannelPipeline;
29 import org.jboss.netty.channel.ExceptionEvent;
30 import org.jboss.netty.channel.MessageEvent;
31
32 import java.lang.reflect.Array;
33 import java.util.ConcurrentModificationException;
34 import java.util.LinkedList;
35 import java.util.Queue;
36
37 import static org.jboss.netty.channel.Channels.*;
38
39
40
41
42 abstract class AbstractCodecEmbedder<E> implements CodecEmbedder<E> {
43
44 private final Channel channel;
45 private final ChannelPipeline pipeline;
46 private final EmbeddedChannelSink sink = new EmbeddedChannelSink();
47
48 final Queue<Object> productQueue = new LinkedList<Object>();
49
50
51
52
53
54 protected AbstractCodecEmbedder(ChannelHandler... handlers) {
55 pipeline = new EmbeddedChannelPipeline();
56 configurePipeline(handlers);
57 channel = new EmbeddedChannel(pipeline, sink);
58 fireInitialEvents();
59 }
60
61
62
63
64
65
66
67
68 protected AbstractCodecEmbedder(ChannelBufferFactory bufferFactory, ChannelHandler... handlers) {
69 this(handlers);
70 getChannel().getConfig().setBufferFactory(bufferFactory);
71 }
72
73 private void fireInitialEvents() {
74
75 fireChannelOpen(channel);
76 fireChannelBound(channel, channel.getLocalAddress());
77 fireChannelConnected(channel, channel.getRemoteAddress());
78 }
79
80 private void configurePipeline(ChannelHandler... handlers) {
81 if (handlers == null) {
82 throw new NullPointerException("handlers");
83 }
84
85 if (handlers.length == 0) {
86 throw new IllegalArgumentException(
87 "handlers should contain at least one " +
88 ChannelHandler.class.getSimpleName() + '.');
89 }
90
91 for (int i = 0; i < handlers.length; i ++) {
92 ChannelHandler h = handlers[i];
93 if (h == null) {
94 throw new NullPointerException("handlers[" + i + ']');
95 }
96 pipeline.addLast(String.valueOf(i), handlers[i]);
97 }
98 pipeline.addLast("SINK", sink);
99 }
100
101 public boolean finish() {
102 close(channel);
103 fireChannelDisconnected(channel);
104 fireChannelUnbound(channel);
105 fireChannelClosed(channel);
106 return !productQueue.isEmpty();
107 }
108
109
110
111
112
113 protected final Channel getChannel() {
114 return channel;
115 }
116
117
118
119
120
121 protected final boolean isEmpty() {
122 return productQueue.isEmpty();
123 }
124
125 public final E poll() {
126 return (E) productQueue.poll();
127 }
128
129 public final E peek() {
130 return (E) productQueue.peek();
131 }
132
133 public final Object[] pollAll() {
134 final int size = size();
135 Object[] a = new Object[size];
136 for (int i = 0; i < size; i ++) {
137 E product = poll();
138 if (product == null) {
139 throw new ConcurrentModificationException();
140 }
141 a[i] = product;
142 }
143 return a;
144 }
145
146 @SuppressWarnings("unchecked")
147 public final <T> T[] pollAll(T[] a) {
148 if (a == null) {
149 throw new NullPointerException("a");
150 }
151
152 final int size = size();
153
154
155 if (a.length < size) {
156 a = (T[]) Array.newInstance(a.getClass().getComponentType(), size);
157 }
158
159 for (int i = 0;; i ++) {
160 T product = (T) poll();
161 if (product == null) {
162 break;
163 }
164 a[i] = product;
165 }
166
167
168 if (a.length > size) {
169 a[size] = null;
170 }
171
172 return a;
173 }
174
175 public final int size() {
176 return productQueue.size();
177 }
178
179 public ChannelPipeline getPipeline() {
180 return pipeline;
181 }
182
183 private final class EmbeddedChannelSink implements ChannelSink, ChannelUpstreamHandler {
184 EmbeddedChannelSink() {
185 }
186
187 public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) {
188 handleEvent(e);
189 }
190
191 public void eventSunk(ChannelPipeline pipeline, ChannelEvent e) {
192 handleEvent(e);
193 }
194
195 private void handleEvent(ChannelEvent e) {
196 if (e instanceof MessageEvent) {
197 boolean offered = productQueue.offer(((MessageEvent) e).getMessage());
198 assert offered;
199 } else if (e instanceof ExceptionEvent) {
200 throw new CodecEmbedderException(((ExceptionEvent) e).getCause());
201 }
202
203
204 }
205
206 public void exceptionCaught(
207 ChannelPipeline pipeline, ChannelEvent e,
208 ChannelPipelineException cause) throws Exception {
209 Throwable actualCause = cause.getCause();
210 if (actualCause == null) {
211 actualCause = cause;
212 }
213
214 throw new CodecEmbedderException(actualCause);
215 }
216
217 public ChannelFuture execute(ChannelPipeline pipeline, Runnable task) {
218 try {
219 task.run();
220 return succeededFuture(pipeline.getChannel());
221 } catch (Throwable t) {
222 return failedFuture(pipeline.getChannel(), t);
223 }
224 }
225 }
226
227 private static final class EmbeddedChannelPipeline extends DefaultChannelPipeline {
228
229 EmbeddedChannelPipeline() {
230 }
231
232 @Override
233 protected void notifyHandlerException(ChannelEvent e, Throwable t) {
234 while (t instanceof ChannelPipelineException && t.getCause() != null) {
235 t = t.getCause();
236 }
237 if (t instanceof CodecEmbedderException) {
238 throw (CodecEmbedderException) t;
239 } else {
240 throw new CodecEmbedderException(t);
241 }
242 }
243 }
244 }