1 /* 2 * Copyright 2012 The Netty Project 3 * 4 * The Netty Project licenses this file to you under the Apache License, 5 * version 2.0 (the "License"); you may not use this file except in compliance 6 * with the License. You may obtain a copy of the License at: 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 13 * License for the specific language governing permissions and limitations 14 * under the License. 15 */ 16 package org.jboss.netty.handler.queue; 17 18 import org.jboss.netty.buffer.ChannelBuffer; 19 import org.jboss.netty.channel.Channel; 20 import org.jboss.netty.channel.ChannelEvent; 21 import org.jboss.netty.channel.ChannelHandlerContext; 22 import org.jboss.netty.channel.ChannelPipeline; 23 import org.jboss.netty.channel.ChannelStateEvent; 24 import org.jboss.netty.channel.ExceptionEvent; 25 import org.jboss.netty.channel.MessageEvent; 26 import org.jboss.netty.channel.SimpleChannelUpstreamHandler; 27 import org.jboss.netty.util.internal.DeadLockProofWorker; 28 29 import java.io.IOException; 30 import java.util.concurrent.BlockingQueue; 31 import java.util.concurrent.LinkedBlockingQueue; 32 import java.util.concurrent.TimeUnit; 33 34 /** 35 * Emulates blocking read operation. This handler stores all received messages 36 * into a {@link BlockingQueue} and returns the received messages when 37 * {@link #read()}, {@link #read(long, TimeUnit)}, {@link #readEvent()}, or 38 * {@link #readEvent(long, TimeUnit)} method is called. 39 * <p> 40 * Please note that this handler is only useful for the cases where there are 41 * very small number of connections, such as testing and simple client-side 42 * application development. 43 * <p> 44 * Also, any handler placed after this handler will never receive 45 * {@code messageReceived}, {@code exceptionCaught}, and {@code channelClosed} 46 * events, hence it should be placed in the last place in a pipeline. 47 * <p> 48 * Here is an example that demonstrates the usage: 49 * <pre> 50 * {@link BlockingReadHandler}<{@link ChannelBuffer}> reader = 51 * new {@link BlockingReadHandler}<{@link ChannelBuffer}>(); 52 * {@link ChannelPipeline} p = ...; 53 * p.addLast("reader", reader); 54 * 55 * ... 56 * 57 * // Read a message from a channel in a blocking manner. 58 * try { 59 * {@link ChannelBuffer} buf = reader.read(60, TimeUnit.SECONDS); 60 * if (buf == null) { 61 * // Connection closed. 62 * } else { 63 * // Handle the received message here. 64 * } 65 * } catch ({@link BlockingReadTimeoutException} e) { 66 * // Read timed out. 67 * } catch (IOException e) { 68 * // Other read errors 69 * } 70 * </pre> 71 * 72 * @param <E> the type of the received messages 73 */ 74 public class BlockingReadHandler<E> extends SimpleChannelUpstreamHandler { 75 76 private final BlockingQueue<ChannelEvent> queue; 77 private volatile boolean closed; 78 79 /** 80 * Creates a new instance with {@link LinkedBlockingQueue} 81 */ 82 public BlockingReadHandler() { 83 this(new LinkedBlockingQueue<ChannelEvent>()); 84 } 85 86 /** 87 * Creates a new instance with the specified {@link BlockingQueue}. 88 */ 89 public BlockingReadHandler(BlockingQueue<ChannelEvent> queue) { 90 if (queue == null) { 91 throw new NullPointerException("queue"); 92 } 93 this.queue = queue; 94 } 95 96 /** 97 * Returns the queue which stores the received messages. The default 98 * implementation returns the queue which was specified in the constructor. 99 */ 100 protected BlockingQueue<ChannelEvent> getQueue() { 101 return queue; 102 } 103 104 /** 105 * Returns {@code true} if and only if the {@link Channel} associated with 106 * this handler has been closed. 107 * 108 * @throws IllegalStateException 109 * if this handler was not added to a {@link ChannelPipeline} yet 110 */ 111 public boolean isClosed() { 112 return closed; 113 } 114 115 /** 116 * Waits until a new message is received or the associated {@link Channel} 117 * is closed. 118 * 119 * @return the received message or {@code null} if the associated 120 * {@link Channel} has been closed 121 * @throws IOException 122 * if failed to receive a new message 123 * @throws InterruptedException 124 * if the operation has been interrupted 125 */ 126 public E read() throws IOException, InterruptedException { 127 ChannelEvent e = readEvent(); 128 if (e == null) { 129 return null; 130 } 131 132 if (e instanceof MessageEvent) { 133 return getMessage((MessageEvent) e); 134 } else if (e instanceof ExceptionEvent) { 135 throw (IOException) new IOException().initCause(((ExceptionEvent) e).getCause()); 136 } else { 137 throw new IllegalStateException(); 138 } 139 } 140 141 /** 142 * Waits until a new message is received or the associated {@link Channel} 143 * is closed. 144 * 145 * @param timeout 146 * the amount time to wait until a new message is received. 147 * If no message is received within the timeout, 148 * {@link BlockingReadTimeoutException} is thrown. 149 * @param unit 150 * the unit of {@code timeout} 151 * 152 * @return the received message or {@code null} if the associated 153 * {@link Channel} has been closed 154 * @throws BlockingReadTimeoutException 155 * if no message was received within the specified timeout 156 * @throws IOException 157 * if failed to receive a new message 158 * @throws InterruptedException 159 * if the operation has been interrupted 160 */ 161 public E read(long timeout, TimeUnit unit) throws IOException, InterruptedException { 162 ChannelEvent e = readEvent(timeout, unit); 163 if (e == null) { 164 return null; 165 } 166 167 if (e instanceof MessageEvent) { 168 return getMessage((MessageEvent) e); 169 } else if (e instanceof ExceptionEvent) { 170 throw (IOException) new IOException().initCause(((ExceptionEvent) e).getCause()); 171 } else { 172 throw new IllegalStateException(); 173 } 174 } 175 176 /** 177 * Waits until a new {@link ChannelEvent} is received or the associated 178 * {@link Channel} is closed. 179 * 180 * @return a {@link MessageEvent} or an {@link ExceptionEvent}. 181 * {@code null} if the associated {@link Channel} has been closed 182 * @throws InterruptedException 183 * if the operation has been interrupted 184 */ 185 public ChannelEvent readEvent() throws InterruptedException { 186 detectDeadLock(); 187 if (isClosed()) { 188 if (getQueue().isEmpty()) { 189 return null; 190 } 191 } 192 193 ChannelEvent e = getQueue().take(); 194 if (e instanceof ChannelStateEvent) { 195 // channelClosed has been triggered. 196 assert closed; 197 return null; 198 } else { 199 return e; 200 } 201 } 202 203 /** 204 * Waits until a new {@link ChannelEvent} is received or the associated 205 * {@link Channel} is closed. 206 * 207 * @param timeout 208 * the amount time to wait until a new {@link ChannelEvent} is 209 * received. If no message is received within the timeout, 210 * {@link BlockingReadTimeoutException} is thrown. 211 * @param unit 212 * the unit of {@code timeout} 213 * 214 * @return a {@link MessageEvent} or an {@link ExceptionEvent}. 215 * {@code null} if the associated {@link Channel} has been closed 216 * @throws BlockingReadTimeoutException 217 * if no event was received within the specified timeout 218 * @throws InterruptedException 219 * if the operation has been interrupted 220 */ 221 public ChannelEvent readEvent( 222 long timeout, TimeUnit unit) throws InterruptedException, BlockingReadTimeoutException { 223 detectDeadLock(); 224 if (isClosed()) { 225 if (getQueue().isEmpty()) { 226 return null; 227 } 228 } 229 230 ChannelEvent e = getQueue().poll(timeout, unit); 231 if (e == null) { 232 throw new BlockingReadTimeoutException(); 233 } else if (e instanceof ChannelStateEvent) { 234 // channelClosed has been triggered. 235 assert closed; 236 return null; 237 } else { 238 return e; 239 } 240 } 241 242 private static void detectDeadLock() { 243 if (DeadLockProofWorker.PARENT.get() != null) { 244 throw new IllegalStateException( 245 "read*(...) in I/O thread causes a dead lock or " + 246 "sudden performance drop. Implement a state machine or " + 247 "call read*() from a different thread."); 248 } 249 } 250 251 @Override 252 public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) 253 throws Exception { 254 getQueue().put(e); 255 } 256 257 @Override 258 public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) 259 throws Exception { 260 getQueue().put(e); 261 } 262 263 @Override 264 public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) 265 throws Exception { 266 closed = true; 267 getQueue().put(e); 268 } 269 270 @SuppressWarnings("unchecked") 271 private E getMessage(MessageEvent e) { 272 return (E) e.getMessage(); 273 } 274 }