1 /* 2 * Copyright 2012 The Netty Project 3 * 4 * The Netty Project licenses this file to you under the Apache License, 5 * version 2.0 (the "License"); you may not use this file except in compliance 6 * with the License. You may obtain a copy of the License at: 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 13 * License for the specific language governing permissions and limitations 14 * under the License. 15 */ 16 package org.jboss.netty.handler.codec.replay; 17 18 import org.jboss.netty.buffer.ChannelBuffer; 19 import org.jboss.netty.channel.Channel; 20 import org.jboss.netty.channel.ChannelHandler; 21 import org.jboss.netty.channel.ChannelHandlerContext; 22 import org.jboss.netty.channel.ChannelPipeline; 23 import org.jboss.netty.channel.ChannelStateEvent; 24 import org.jboss.netty.channel.MessageEvent; 25 import org.jboss.netty.handler.codec.frame.FrameDecoder; 26 27 import java.net.SocketAddress; 28 29 /** 30 * A specialized variation of {@link FrameDecoder} which enables implementation 31 * of a non-blocking decoder in the blocking I/O paradigm. 32 * <p> 33 * The biggest difference between {@link ReplayingDecoder} and 34 * {@link FrameDecoder} is that {@link ReplayingDecoder} allows you to 35 * implement the {@code decode()} and {@code decodeLast()} methods just like 36 * all required bytes were received already, rather than checking the 37 * availability of the required bytes. For example, the following 38 * {@link FrameDecoder} implementation: 39 * <pre> 40 * public class IntegerHeaderFrameDecoder extends {@link FrameDecoder} { 41 * 42 * {@code @Override} 43 * protected Object decode({@link ChannelHandlerContext} ctx, 44 * {@link Channel} channel, 45 * {@link ChannelBuffer} buf) throws Exception { 46 * 47 * if (buf.readableBytes() < 4) { 48 * return <strong>null</strong>; 49 * } 50 * 51 * buf.markReaderIndex(); 52 * int length = buf.readInt(); 53 * 54 * if (buf.readableBytes() < length) { 55 * buf.resetReaderIndex(); 56 * return <strong>null</strong>; 57 * } 58 * 59 * return buf.readBytes(length); 60 * } 61 * } 62 * </pre> 63 * is simplified like the following with {@link ReplayingDecoder}: 64 * <pre> 65 * public class IntegerHeaderFrameDecoder 66 * extends {@link ReplayingDecoder}<{@link VoidEnum}> { 67 * 68 * protected Object decode({@link ChannelHandlerContext} ctx, 69 * {@link Channel} channel, 70 * {@link ChannelBuffer} buf, 71 * {@link VoidEnum} state) throws Exception { 72 * 73 * return buf.readBytes(buf.readInt()); 74 * } 75 * } 76 * </pre> 77 * 78 * <h3>How does this work?</h3> 79 * <p> 80 * {@link ReplayingDecoder} passes a specialized {@link ChannelBuffer} 81 * implementation which throws an {@link Error} of certain type when there's not 82 * enough data in the buffer. In the {@code IntegerHeaderFrameDecoder} above, 83 * you just assumed that there will be 4 or more bytes in the buffer when 84 * you call {@code buf.readInt()}. If there's really 4 bytes in the buffer, 85 * it will return the integer header as you expected. Otherwise, the 86 * {@link Error} will be raised and the control will be returned to 87 * {@link ReplayingDecoder}. If {@link ReplayingDecoder} catches the 88 * {@link Error}, then it will rewind the {@code readerIndex} of the buffer 89 * back to the 'initial' position (i.e. the beginning of the buffer) and call 90 * the {@code decode(..)} method again when more data is received into the 91 * buffer. 92 * <p> 93 * Please note that {@link ReplayingDecoder} always throws the same cached 94 * {@link Error} instance to avoid the overhead of creating a new {@link Error} 95 * and filling its stack trace for every throw. 96 * 97 * <h3>Limitations</h3> 98 * <p> 99 * At the cost of the simplicity, {@link ReplayingDecoder} enforces you a few 100 * limitations: 101 * <ul> 102 * <li>Some buffer operations are prohibited.</li> 103 * <li>Performance can be worse if the network is slow and the message 104 * format is complicated unlike the example above. In this case, your 105 * decoder might have to decode the same part of the message over and over 106 * again.</li> 107 * <li>You must keep in mind that {@code decode(..)} method can be called many 108 * times to decode a single message. For example, the following code will 109 * not work: 110 * <pre> public class MyDecoder extends {@link ReplayingDecoder}<{@link VoidEnum}> { 111 * 112 * private final Queue<Integer> values = new LinkedList<Integer>(); 113 * 114 * {@code @Override} 115 * public Object decode(.., {@link ChannelBuffer} buffer, ..) throws Exception { 116 * 117 * // A message contains 2 integers. 118 * values.offer(buffer.readInt()); 119 * values.offer(buffer.readInt()); 120 * 121 * // This assertion will fail intermittently since values.offer() 122 * // can be called more than two times! 123 * assert values.size() == 2; 124 * return values.poll() + values.poll(); 125 * } 126 * }</pre> 127 * The correct implementation looks like the following, and you can also 128 * utilize the 'checkpoint' feature which is explained in detail in the 129 * next section. 130 * <pre> public class MyDecoder extends {@link ReplayingDecoder}<{@link VoidEnum}> { 131 * 132 * private final Queue<Integer> values = new LinkedList<Integer>(); 133 * 134 * {@code @Override} 135 * public Object decode(.., {@link ChannelBuffer} buffer, ..) throws Exception { 136 * 137 * // Revert the state of the variable that might have been changed 138 * // since the last partial decode. 139 * values.clear(); 140 * 141 * // A message contains 2 integers. 142 * values.offer(buffer.readInt()); 143 * values.offer(buffer.readInt()); 144 * 145 * // Now we know this assertion will never fail. 146 * assert values.size() == 2; 147 * return values.poll() + values.poll(); 148 * } 149 * }</pre> 150 * </li> 151 * </ul> 152 * 153 * <h3>Improving the performance</h3> 154 * <p> 155 * Fortunately, the performance of a complex decoder implementation can be 156 * improved significantly with the {@code checkpoint()} method. The 157 * {@code checkpoint()} method updates the 'initial' position of the buffer so 158 * that {@link ReplayingDecoder} rewinds the {@code readerIndex} of the buffer 159 * to the last position where you called the {@code checkpoint()} method. 160 * 161 * <h4>Calling {@code checkpoint(T)} with an {@link Enum}</h4> 162 * <p> 163 * Although you can just use {@code checkpoint()} method and manage the state 164 * of the decoder by yourself, the easiest way to manage the state of the 165 * decoder is to create an {@link Enum} type which represents the current state 166 * of the decoder and to call {@code checkpoint(T)} method whenever the state 167 * changes. You can have as many states as you want depending on the 168 * complexity of the message you want to decode: 169 * 170 * <pre> 171 * public enum MyDecoderState { 172 * READ_LENGTH, 173 * READ_CONTENT; 174 * } 175 * 176 * public class IntegerHeaderFrameDecoder 177 * extends {@link ReplayingDecoder}<<strong>MyDecoderState</strong>> { 178 * 179 * private int length; 180 * 181 * public IntegerHeaderFrameDecoder() { 182 * // Set the initial state. 183 * <strong>super(MyDecoderState.READ_LENGTH);</strong> 184 * } 185 * 186 * {@code @Override} 187 * protected Object decode({@link ChannelHandlerContext} ctx, 188 * {@link Channel} channel, 189 * {@link ChannelBuffer} buf, 190 * <b>MyDecoderState</b> state) throws Exception { 191 * switch (state) { 192 * case READ_LENGTH: 193 * length = buf.readInt(); 194 * <strong>checkpoint(MyDecoderState.READ_CONTENT);</strong> 195 * case READ_CONTENT: 196 * ChannelBuffer frame = buf.readBytes(length); 197 * <strong>checkpoint(MyDecoderState.READ_LENGTH);</strong> 198 * return frame; 199 * default: 200 * throw new Error("Shouldn't reach here."); 201 * } 202 * } 203 * } 204 * </pre> 205 * 206 * <h4>Calling {@code checkpoint()} with no parameter</h4> 207 * <p> 208 * An alternative way to manage the decoder state is to manage it by yourself. 209 * <pre> 210 * public class IntegerHeaderFrameDecoder 211 * extends {@link ReplayingDecoder}<<strong>{@link VoidEnum}</strong>> { 212 * 213 * <strong>private boolean readLength;</strong> 214 * private int length; 215 * 216 * {@code @Override} 217 * protected Object decode({@link ChannelHandlerContext} ctx, 218 * {@link Channel} channel, 219 * {@link ChannelBuffer} buf, 220 * {@link VoidEnum} state) throws Exception { 221 * if (!readLength) { 222 * length = buf.readInt(); 223 * <strong>readLength = true;</strong> 224 * <strong>checkpoint();</strong> 225 * } 226 * 227 * if (readLength) { 228 * ChannelBuffer frame = buf.readBytes(length); 229 * <strong>readLength = false;</strong> 230 * <strong>checkpoint();</strong> 231 * return frame; 232 * } 233 * } 234 * } 235 * </pre> 236 * 237 * <h3>Replacing a decoder with another decoder in a pipeline</h3> 238 * <p> 239 * If you are going to write a protocol multiplexer, you will probably want to 240 * replace a {@link ReplayingDecoder} (protocol detector) with another 241 * {@link ReplayingDecoder} or {@link FrameDecoder} (actual protocol decoder). 242 * It is not possible to achieve this simply by calling 243 * {@link ChannelPipeline#replace(ChannelHandler, String, ChannelHandler)}, but 244 * some additional steps are required: 245 * <pre> 246 * public class FirstDecoder extends {@link ReplayingDecoder}<{@link VoidEnum}> { 247 * 248 * public FirstDecoder() { 249 * super(true); // Enable unfold 250 * } 251 * 252 * {@code @Override} 253 * protected Object decode({@link ChannelHandlerContext} ctx, 254 * {@link Channel} ch, 255 * {@link ChannelBuffer} buf, 256 * {@link VoidEnum} state) { 257 * ... 258 * // Decode the first message 259 * Object firstMessage = ...; 260 * 261 * // Add the second decoder 262 * ctx.getPipeline().addLast("second", new SecondDecoder()); 263 * 264 * // Remove the first decoder (me) 265 * ctx.getPipeline().remove(this); 266 * 267 * if (buf.readable()) { 268 * // Hand off the remaining data to the second decoder 269 * return new Object[] { firstMessage, buf.readBytes(<b>super.actualReadableBytes()</b>) }; 270 * } else { 271 * // Nothing to hand off 272 * return firstMessage; 273 * } 274 * } 275 * </pre> 276 * 277 * @param <T> 278 * the state type; use {@link VoidEnum} if state management is unused 279 * 280 * @apiviz.landmark 281 * @apiviz.has org.jboss.netty.handler.codec.replay.UnreplayableOperationException oneway - - throws 282 */ 283 public abstract class ReplayingDecoder<T extends Enum<T>> 284 extends FrameDecoder { 285 286 private final ReplayingDecoderBuffer replayable = new ReplayingDecoderBuffer(this); 287 private T state; 288 private int checkpoint; 289 private boolean needsCleanup; 290 291 /** 292 * Creates a new instance with no initial state (i.e: {@code null}). 293 */ 294 protected ReplayingDecoder() { 295 this(null); 296 } 297 298 protected ReplayingDecoder(boolean unfold) { 299 this(null, unfold); 300 } 301 302 /** 303 * Creates a new instance with the specified initial state. 304 */ 305 protected ReplayingDecoder(T initialState) { 306 this(initialState, false); 307 } 308 309 protected ReplayingDecoder(T initialState, boolean unfold) { 310 super(unfold); 311 state = initialState; 312 } 313 314 @Override 315 protected ChannelBuffer internalBuffer() { 316 return super.internalBuffer(); 317 } 318 319 /** 320 * Stores the internal cumulative buffer's reader position. 321 */ 322 protected void checkpoint() { 323 ChannelBuffer cumulation = this.cumulation; 324 if (cumulation != null) { 325 checkpoint = cumulation.readerIndex(); 326 } else { 327 checkpoint = -1; // buffer not available (already cleaned up) 328 } 329 } 330 331 /** 332 * Stores the internal cumulative buffer's reader position and updates 333 * the current decoder state. 334 */ 335 protected void checkpoint(T state) { 336 checkpoint(); 337 setState(state); 338 } 339 340 /** 341 * Returns the current state of this decoder. 342 * @return the current state of this decoder 343 */ 344 protected T getState() { 345 return state; 346 } 347 348 /** 349 * Sets the current state of this decoder. 350 * @return the old state of this decoder 351 */ 352 protected T setState(T newState) { 353 T oldState = state; 354 state = newState; 355 return oldState; 356 } 357 358 /** 359 * Decodes the received packets so far into a frame. 360 * 361 * @param ctx the context of this handler 362 * @param channel the current channel 363 * @param buffer the cumulative buffer of received packets so far. 364 * Note that the buffer might be empty, which means you 365 * should not make an assumption that the buffer contains 366 * at least one byte in your decoder implementation. 367 * @param state the current decoder state ({@code null} if unused) 368 * 369 * @return the decoded frame 370 */ 371 protected abstract Object decode(ChannelHandlerContext ctx, 372 Channel channel, ChannelBuffer buffer, T state) throws Exception; 373 374 /** 375 * Decodes the received data so far into a frame when the channel is 376 * disconnected. 377 * 378 * @param ctx the context of this handler 379 * @param channel the current channel 380 * @param buffer the cumulative buffer of received packets so far. 381 * Note that the buffer might be empty, which means you 382 * should not make an assumption that the buffer contains 383 * at least one byte in your decoder implementation. 384 * @param state the current decoder state ({@code null} if unused) 385 * 386 * @return the decoded frame 387 */ 388 protected Object decodeLast( 389 ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer, T state) throws Exception { 390 return decode(ctx, channel, buffer, state); 391 } 392 393 /** 394 * Calls {@link #decode(ChannelHandlerContext, Channel, ChannelBuffer, Enum)}. This method 395 * should be never used by {@link ReplayingDecoder} itself. But to be safe we should handle it 396 * anyway 397 */ 398 @Override 399 protected final Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception { 400 return decode(ctx, channel, buffer, state); 401 } 402 403 @Override 404 protected final Object decodeLast( 405 ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception { 406 return decodeLast(ctx, channel, buffer, state); 407 } 408 409 @Override 410 public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) 411 throws Exception { 412 413 Object m = e.getMessage(); 414 if (!(m instanceof ChannelBuffer)) { 415 ctx.sendUpstream(e); 416 return; 417 } 418 419 ChannelBuffer input = (ChannelBuffer) m; 420 if (!input.readable()) { 421 return; 422 } 423 424 needsCleanup = true; 425 426 if (cumulation == null) { 427 // the cumulation buffer is not created yet so just pass the input 428 // to callDecode(...) method 429 cumulation = input; 430 431 int oldReaderIndex = input.readerIndex(); 432 int inputSize = input.readableBytes(); 433 434 try { 435 callDecode( 436 ctx, e.getChannel(), 437 input, replayable, 438 e.getRemoteAddress()); 439 } finally { 440 int readableBytes = input.readableBytes(); 441 if (readableBytes > 0) { 442 int inputCapacity = input.capacity(); 443 // check if readableBytes == capacity we can safe the copy as we will not be able to 444 // optimize memory usage anyway 445 boolean copy = 446 readableBytes != inputCapacity && 447 inputCapacity > getMaxCumulationBufferCapacity(); 448 449 // seems like there is something readable left in the input buffer 450 // or decoder wants a replay - create the cumulation buffer and 451 // copy the input into it 452 ChannelBuffer cumulation; 453 if (checkpoint > 0) { 454 int bytesToPreserve = inputSize - (checkpoint - oldReaderIndex); 455 if (copy) { 456 this.cumulation = cumulation = newCumulationBuffer(ctx, bytesToPreserve); 457 cumulation.writeBytes(input, checkpoint, bytesToPreserve); 458 } else { 459 this.cumulation = input.slice(checkpoint, bytesToPreserve); 460 } 461 } else if (checkpoint == 0) { 462 if (copy) { 463 this.cumulation = cumulation = newCumulationBuffer(ctx, inputSize); 464 cumulation.writeBytes(input, oldReaderIndex, inputSize); 465 cumulation.readerIndex(input.readerIndex()); 466 } else { 467 this.cumulation = cumulation = input.slice(oldReaderIndex, inputSize); 468 cumulation.readerIndex(input.readerIndex()); 469 } 470 } else { 471 if (copy) { 472 this.cumulation = cumulation = newCumulationBuffer(ctx, input.readableBytes()); 473 cumulation.writeBytes(input); 474 } else { 475 this.cumulation = input; 476 } 477 } 478 } else { 479 cumulation = null; 480 } 481 } 482 } else { 483 input = appendToCumulation(input); 484 try { 485 callDecode(ctx, e.getChannel(), input, replayable, e.getRemoteAddress()); 486 } finally { 487 updateCumulation(ctx, input); 488 } 489 } 490 } 491 492 private void callDecode( 493 ChannelHandlerContext context, Channel channel, 494 ChannelBuffer input, ChannelBuffer replayableInput, SocketAddress remoteAddress) throws Exception { 495 while (input.readable()) { 496 int oldReaderIndex = checkpoint = input.readerIndex(); 497 Object result = null; 498 T oldState = state; 499 try { 500 result = decode(context, channel, replayableInput, state); 501 if (result == null) { 502 if (oldReaderIndex == input.readerIndex() && oldState == state) { 503 throw new IllegalStateException( 504 "null cannot be returned if no data is consumed and state didn't change."); 505 } else { 506 // Previous data has been discarded or caused state transition. 507 // Probably it is reading on. 508 continue; 509 } 510 } 511 } catch (ReplayError replay) { 512 // Return to the checkpoint (or oldPosition) and retry. 513 int checkpoint = this.checkpoint; 514 if (checkpoint >= 0) { 515 input.readerIndex(checkpoint); 516 } else { 517 // Called by cleanup() - no need to maintain the readerIndex 518 // anymore because the buffer has been released already. 519 } 520 } 521 522 if (result == null) { 523 // Seems like more data is required. 524 // Let us wait for the next notification. 525 break; 526 } 527 528 if (oldReaderIndex == input.readerIndex() && oldState == state) { 529 throw new IllegalStateException( 530 "decode() method must consume at least one byte " + 531 "if it returned a decoded message (caused by: " + 532 getClass() + ')'); 533 } 534 535 // A successful decode 536 unfoldAndFireMessageReceived(context, remoteAddress, result); 537 } 538 } 539 540 @Override 541 protected void cleanup(ChannelHandlerContext ctx, ChannelStateEvent e) 542 throws Exception { 543 try { 544 ChannelBuffer cumulation = this.cumulation; 545 if (!needsCleanup) { 546 return; 547 } 548 549 needsCleanup = false; 550 replayable.terminate(); 551 552 if (cumulation != null && cumulation.readable()) { 553 // Make sure all data was read before notifying a closed channel. 554 callDecode(ctx, e.getChannel(), cumulation, replayable, null); 555 } 556 557 // Call decodeLast() finally. Please note that decodeLast() is 558 // called even if there's nothing more to read from the buffer to 559 // notify a user that the connection was closed explicitly. 560 Object partiallyDecoded = decodeLast(ctx, e.getChannel(), replayable, state); 561 562 this.cumulation = null; 563 564 if (partiallyDecoded != null) { 565 unfoldAndFireMessageReceived(ctx, null, partiallyDecoded); 566 } 567 } catch (ReplayError replay) { 568 // Ignore 569 } finally { 570 ctx.sendUpstream(e); 571 } 572 } 573 }