1 /*
2 * Copyright 2012 The Netty Project
3 *
4 * The Netty Project licenses this file to you under the Apache License,
5 * version 2.0 (the "License"); you may not use this file except in compliance
6 * with the License. You may obtain a copy of the License at:
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations
14 * under the License.
15 */
16 package org.jboss.netty.handler.queue;
17
18 import org.jboss.netty.buffer.ChannelBuffer;
19 import org.jboss.netty.channel.Channel;
20 import org.jboss.netty.channel.ChannelEvent;
21 import org.jboss.netty.channel.ChannelHandlerContext;
22 import org.jboss.netty.channel.ChannelPipeline;
23 import org.jboss.netty.channel.ChannelStateEvent;
24 import org.jboss.netty.channel.ExceptionEvent;
25 import org.jboss.netty.channel.MessageEvent;
26 import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
27 import org.jboss.netty.util.internal.DeadLockProofWorker;
28
29 import java.io.IOException;
30 import java.util.concurrent.BlockingQueue;
31 import java.util.concurrent.LinkedBlockingQueue;
32 import java.util.concurrent.TimeUnit;
33
34 /**
35 * Emulates blocking read operation. This handler stores all received messages
36 * into a {@link BlockingQueue} and returns the received messages when
37 * {@link #read()}, {@link #read(long, TimeUnit)}, {@link #readEvent()}, or
38 * {@link #readEvent(long, TimeUnit)} method is called.
39 * <p>
40 * Please note that this handler is only useful for the cases where there are
41 * very small number of connections, such as testing and simple client-side
42 * application development.
43 * <p>
44 * Also, any handler placed after this handler will never receive
45 * {@code messageReceived}, {@code exceptionCaught}, and {@code channelClosed}
46 * events, hence it should be placed in the last place in a pipeline.
47 * <p>
48 * Here is an example that demonstrates the usage:
49 * <pre>
50 * {@link BlockingReadHandler}<{@link ChannelBuffer}> reader =
51 * new {@link BlockingReadHandler}<{@link ChannelBuffer}>();
52 * {@link ChannelPipeline} p = ...;
53 * p.addLast("reader", reader);
54 *
55 * ...
56 *
57 * // Read a message from a channel in a blocking manner.
58 * try {
59 * {@link ChannelBuffer} buf = reader.read(60, TimeUnit.SECONDS);
60 * if (buf == null) {
61 * // Connection closed.
62 * } else {
63 * // Handle the received message here.
64 * }
65 * } catch ({@link BlockingReadTimeoutException} e) {
66 * // Read timed out.
67 * } catch (IOException e) {
68 * // Other read errors
69 * }
70 * </pre>
71 *
72 * @param <E> the type of the received messages
73 */
74 public class BlockingReadHandler<E> extends SimpleChannelUpstreamHandler {
75
76 private final BlockingQueue<ChannelEvent> queue;
77 private volatile boolean closed;
78
79 /**
80 * Creates a new instance with {@link LinkedBlockingQueue}
81 */
82 public BlockingReadHandler() {
83 this(new LinkedBlockingQueue<ChannelEvent>());
84 }
85
86 /**
87 * Creates a new instance with the specified {@link BlockingQueue}.
88 */
89 public BlockingReadHandler(BlockingQueue<ChannelEvent> queue) {
90 if (queue == null) {
91 throw new NullPointerException("queue");
92 }
93 this.queue = queue;
94 }
95
96 /**
97 * Returns the queue which stores the received messages. The default
98 * implementation returns the queue which was specified in the constructor.
99 */
100 protected BlockingQueue<ChannelEvent> getQueue() {
101 return queue;
102 }
103
104 /**
105 * Returns {@code true} if and only if the {@link Channel} associated with
106 * this handler has been closed.
107 *
108 * @throws IllegalStateException
109 * if this handler was not added to a {@link ChannelPipeline} yet
110 */
111 public boolean isClosed() {
112 return closed;
113 }
114
115 /**
116 * Waits until a new message is received or the associated {@link Channel}
117 * is closed.
118 *
119 * @return the received message or {@code null} if the associated
120 * {@link Channel} has been closed
121 * @throws IOException
122 * if failed to receive a new message
123 * @throws InterruptedException
124 * if the operation has been interrupted
125 */
126 public E read() throws IOException, InterruptedException {
127 ChannelEvent e = readEvent();
128 if (e == null) {
129 return null;
130 }
131
132 if (e instanceof MessageEvent) {
133 return getMessage((MessageEvent) e);
134 } else if (e instanceof ExceptionEvent) {
135 throw (IOException) new IOException().initCause(((ExceptionEvent) e).getCause());
136 } else {
137 throw new IllegalStateException();
138 }
139 }
140
141 /**
142 * Waits until a new message is received or the associated {@link Channel}
143 * is closed.
144 *
145 * @param timeout
146 * the amount time to wait until a new message is received.
147 * If no message is received within the timeout,
148 * {@link BlockingReadTimeoutException} is thrown.
149 * @param unit
150 * the unit of {@code timeout}
151 *
152 * @return the received message or {@code null} if the associated
153 * {@link Channel} has been closed
154 * @throws BlockingReadTimeoutException
155 * if no message was received within the specified timeout
156 * @throws IOException
157 * if failed to receive a new message
158 * @throws InterruptedException
159 * if the operation has been interrupted
160 */
161 public E read(long timeout, TimeUnit unit) throws IOException, InterruptedException {
162 ChannelEvent e = readEvent(timeout, unit);
163 if (e == null) {
164 return null;
165 }
166
167 if (e instanceof MessageEvent) {
168 return getMessage((MessageEvent) e);
169 } else if (e instanceof ExceptionEvent) {
170 throw (IOException) new IOException().initCause(((ExceptionEvent) e).getCause());
171 } else {
172 throw new IllegalStateException();
173 }
174 }
175
176 /**
177 * Waits until a new {@link ChannelEvent} is received or the associated
178 * {@link Channel} is closed.
179 *
180 * @return a {@link MessageEvent} or an {@link ExceptionEvent}.
181 * {@code null} if the associated {@link Channel} has been closed
182 * @throws InterruptedException
183 * if the operation has been interrupted
184 */
185 public ChannelEvent readEvent() throws InterruptedException {
186 detectDeadLock();
187 if (isClosed()) {
188 if (getQueue().isEmpty()) {
189 return null;
190 }
191 }
192
193 ChannelEvent e = getQueue().take();
194 if (e instanceof ChannelStateEvent) {
195 // channelClosed has been triggered.
196 assert closed;
197 return null;
198 } else {
199 return e;
200 }
201 }
202
203 /**
204 * Waits until a new {@link ChannelEvent} is received or the associated
205 * {@link Channel} is closed.
206 *
207 * @param timeout
208 * the amount time to wait until a new {@link ChannelEvent} is
209 * received. If no message is received within the timeout,
210 * {@link BlockingReadTimeoutException} is thrown.
211 * @param unit
212 * the unit of {@code timeout}
213 *
214 * @return a {@link MessageEvent} or an {@link ExceptionEvent}.
215 * {@code null} if the associated {@link Channel} has been closed
216 * @throws BlockingReadTimeoutException
217 * if no event was received within the specified timeout
218 * @throws InterruptedException
219 * if the operation has been interrupted
220 */
221 public ChannelEvent readEvent(
222 long timeout, TimeUnit unit) throws InterruptedException, BlockingReadTimeoutException {
223 detectDeadLock();
224 if (isClosed()) {
225 if (getQueue().isEmpty()) {
226 return null;
227 }
228 }
229
230 ChannelEvent e = getQueue().poll(timeout, unit);
231 if (e == null) {
232 throw new BlockingReadTimeoutException();
233 } else if (e instanceof ChannelStateEvent) {
234 // channelClosed has been triggered.
235 assert closed;
236 return null;
237 } else {
238 return e;
239 }
240 }
241
242 private static void detectDeadLock() {
243 if (DeadLockProofWorker.PARENT.get() != null) {
244 throw new IllegalStateException(
245 "read*(...) in I/O thread causes a dead lock or " +
246 "sudden performance drop. Implement a state machine or " +
247 "call read*() from a different thread.");
248 }
249 }
250
251 @Override
252 public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
253 throws Exception {
254 getQueue().put(e);
255 }
256
257 @Override
258 public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
259 throws Exception {
260 getQueue().put(e);
261 }
262
263 @Override
264 public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e)
265 throws Exception {
266 closed = true;
267 getQueue().put(e);
268 }
269
270 @SuppressWarnings("unchecked")
271 private E getMessage(MessageEvent e) {
272 return (E) e.getMessage();
273 }
274 }