View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.handler.queue;
17  
18  import org.jboss.netty.buffer.ChannelBuffer;
19  import org.jboss.netty.channel.Channel;
20  import org.jboss.netty.channel.ChannelEvent;
21  import org.jboss.netty.channel.ChannelHandlerContext;
22  import org.jboss.netty.channel.ChannelPipeline;
23  import org.jboss.netty.channel.ChannelStateEvent;
24  import org.jboss.netty.channel.ExceptionEvent;
25  import org.jboss.netty.channel.MessageEvent;
26  import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
27  import org.jboss.netty.util.internal.DeadLockProofWorker;
28  
29  import java.io.IOException;
30  import java.util.concurrent.BlockingQueue;
31  import java.util.concurrent.LinkedBlockingQueue;
32  import java.util.concurrent.TimeUnit;
33  
34  /**
35   * Emulates blocking read operation.  This handler stores all received messages
36   * into a {@link BlockingQueue} and returns the received messages when
37   * {@link #read()}, {@link #read(long, TimeUnit)}, {@link #readEvent()}, or
38   * {@link #readEvent(long, TimeUnit)} method is called.
39   * <p>
40   * Please note that this handler is only useful for the cases where there are
41   * very small number of connections, such as testing and simple client-side
42   * application development.
43   * <p>
44   * Also, any handler placed after this handler will never receive
45   * {@code messageReceived}, {@code exceptionCaught}, and {@code channelClosed}
46   * events, hence it should be placed in the last place in a pipeline.
47   * <p>
48   * Here is an example that demonstrates the usage:
49   * <pre>
50   * {@link BlockingReadHandler}&lt;{@link ChannelBuffer}&gt; reader =
51   *         new {@link BlockingReadHandler}&lt;{@link ChannelBuffer}&gt;();
52   * {@link ChannelPipeline} p = ...;
53   * p.addLast("reader", reader);
54   *
55   * ...
56   *
57   * // Read a message from a channel in a blocking manner.
58   * try {
59   *     {@link ChannelBuffer} buf = reader.read(60, TimeUnit.SECONDS);
60   *     if (buf == null) {
61   *         // Connection closed.
62   *     } else {
63   *         // Handle the received message here.
64   *     }
65   * } catch ({@link BlockingReadTimeoutException} e) {
66   *     // Read timed out.
67   * } catch (IOException e) {
68   *     // Other read errors
69   * }
70   * </pre>
71   *
72   * @param <E> the type of the received messages
73   */
74  public class BlockingReadHandler<E> extends SimpleChannelUpstreamHandler {
75  
76      private final BlockingQueue<ChannelEvent> queue;
77      private volatile boolean closed;
78  
79      /**
80       * Creates a new instance with {@link LinkedBlockingQueue}
81       */
82      public BlockingReadHandler() {
83          this(new LinkedBlockingQueue<ChannelEvent>());
84      }
85  
86      /**
87       * Creates a new instance with the specified {@link BlockingQueue}.
88       */
89      public BlockingReadHandler(BlockingQueue<ChannelEvent> queue) {
90          if (queue == null) {
91              throw new NullPointerException("queue");
92          }
93          this.queue = queue;
94      }
95  
96      /**
97       * Returns the queue which stores the received messages.  The default
98       * implementation returns the queue which was specified in the constructor.
99       */
100     protected BlockingQueue<ChannelEvent> getQueue() {
101         return queue;
102     }
103 
104     /**
105      * Returns {@code true} if and only if the {@link Channel} associated with
106      * this handler has been closed.
107      *
108      * @throws IllegalStateException
109      *         if this handler was not added to a {@link ChannelPipeline} yet
110      */
111     public boolean isClosed() {
112         return closed;
113     }
114 
115     /**
116      * Waits until a new message is received or the associated {@link Channel}
117      * is closed.
118      *
119      * @return the received message or {@code null} if the associated
120      *         {@link Channel} has been closed
121      * @throws IOException
122      *         if failed to receive a new message
123      * @throws InterruptedException
124      *         if the operation has been interrupted
125      */
126     public E read() throws IOException, InterruptedException {
127         ChannelEvent e = readEvent();
128         if (e == null) {
129             return null;
130         }
131 
132         if (e instanceof MessageEvent) {
133             return getMessage((MessageEvent) e);
134         } else if (e instanceof ExceptionEvent) {
135             throw (IOException) new IOException().initCause(((ExceptionEvent) e).getCause());
136         } else {
137             throw new IllegalStateException();
138         }
139     }
140 
141     /**
142      * Waits until a new message is received or the associated {@link Channel}
143      * is closed.
144      *
145      * @param timeout
146      *        the amount time to wait until a new message is received.
147      *        If no message is received within the timeout,
148      *        {@link BlockingReadTimeoutException} is thrown.
149      * @param unit
150      *        the unit of {@code timeout}
151      *
152      * @return the received message or {@code null} if the associated
153      *         {@link Channel} has been closed
154      * @throws BlockingReadTimeoutException
155      *         if no message was received within the specified timeout
156      * @throws IOException
157      *         if failed to receive a new message
158      * @throws InterruptedException
159      *         if the operation has been interrupted
160      */
161     public E read(long timeout, TimeUnit unit) throws IOException, InterruptedException {
162         ChannelEvent e = readEvent(timeout, unit);
163         if (e == null) {
164             return null;
165         }
166 
167         if (e instanceof MessageEvent) {
168             return getMessage((MessageEvent) e);
169         } else if (e instanceof ExceptionEvent) {
170             throw (IOException) new IOException().initCause(((ExceptionEvent) e).getCause());
171         } else {
172             throw new IllegalStateException();
173         }
174     }
175 
176     /**
177      * Waits until a new {@link ChannelEvent} is received or the associated
178      * {@link Channel} is closed.
179      *
180      * @return a {@link MessageEvent} or an {@link ExceptionEvent}.
181      *         {@code null} if the associated {@link Channel} has been closed
182      * @throws InterruptedException
183      *         if the operation has been interrupted
184      */
185     public ChannelEvent readEvent() throws InterruptedException {
186         detectDeadLock();
187         if (isClosed()) {
188             if (getQueue().isEmpty()) {
189                 return null;
190             }
191         }
192 
193         ChannelEvent e = getQueue().take();
194         if (e instanceof ChannelStateEvent) {
195             // channelClosed has been triggered.
196             assert closed;
197             return null;
198         } else {
199             return e;
200         }
201     }
202 
203     /**
204      * Waits until a new {@link ChannelEvent} is received or the associated
205      * {@link Channel} is closed.
206      *
207      * @param timeout
208      *        the amount time to wait until a new {@link ChannelEvent} is
209      *        received.  If no message is received within the timeout,
210      *        {@link BlockingReadTimeoutException} is thrown.
211      * @param unit
212      *        the unit of {@code timeout}
213      *
214      * @return a {@link MessageEvent} or an {@link ExceptionEvent}.
215      *         {@code null} if the associated {@link Channel} has been closed
216      * @throws BlockingReadTimeoutException
217      *         if no event was received within the specified timeout
218      * @throws InterruptedException
219      *         if the operation has been interrupted
220      */
221     public ChannelEvent readEvent(
222             long timeout, TimeUnit unit) throws InterruptedException, BlockingReadTimeoutException {
223         detectDeadLock();
224         if (isClosed()) {
225             if (getQueue().isEmpty()) {
226                 return null;
227             }
228         }
229 
230         ChannelEvent e = getQueue().poll(timeout, unit);
231         if (e == null) {
232             throw new BlockingReadTimeoutException();
233         } else if (e instanceof ChannelStateEvent) {
234             // channelClosed has been triggered.
235             assert closed;
236             return null;
237         } else {
238             return e;
239         }
240     }
241 
242     private static void detectDeadLock() {
243         if (DeadLockProofWorker.PARENT.get() != null) {
244             throw new IllegalStateException(
245                     "read*(...) in I/O thread causes a dead lock or " +
246                     "sudden performance drop. Implement a state machine or " +
247                     "call read*() from a different thread.");
248         }
249     }
250 
251     @Override
252     public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
253             throws Exception {
254         getQueue().put(e);
255     }
256 
257     @Override
258     public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
259             throws Exception {
260         getQueue().put(e);
261     }
262 
263     @Override
264     public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e)
265             throws Exception {
266         closed = true;
267         getQueue().put(e);
268     }
269 
270     @SuppressWarnings("unchecked")
271     private E getMessage(MessageEvent e) {
272         return (E) e.getMessage();
273     }
274 }