1 /* 2 * Copyright 2012 The Netty Project 3 * 4 * The Netty Project licenses this file to you under the Apache License, 5 * version 2.0 (the "License"); you may not use this file except in compliance 6 * with the License. You may obtain a copy of the License at: 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 13 * License for the specific language governing permissions and limitations 14 * under the License. 15 */ 16 package org.jboss.netty.handler.codec.replay; 17 18 import java.net.SocketAddress; 19 20 import org.jboss.netty.buffer.ChannelBuffer; 21 import org.jboss.netty.channel.Channel; 22 import org.jboss.netty.channel.ChannelHandler; 23 import org.jboss.netty.channel.ChannelHandlerContext; 24 import org.jboss.netty.channel.ChannelPipeline; 25 import org.jboss.netty.channel.ChannelStateEvent; 26 import org.jboss.netty.channel.MessageEvent; 27 import org.jboss.netty.handler.codec.frame.FrameDecoder; 28 29 /** 30 * A specialized variation of {@link FrameDecoder} which enables implementation 31 * of a non-blocking decoder in the blocking I/O paradigm. 32 * <p> 33 * The biggest difference between {@link ReplayingDecoder} and 34 * {@link FrameDecoder} is that {@link ReplayingDecoder} allows you to 35 * implement the {@code decode()} and {@code decodeLast()} methods just like 36 * all required bytes were received already, rather than checking the 37 * availability of the required bytes. For example, the following 38 * {@link FrameDecoder} implementation: 39 * <pre> 40 * public class IntegerHeaderFrameDecoder extends {@link FrameDecoder} { 41 * 42 * {@code @Override} 43 * protected Object decode({@link ChannelHandlerContext} ctx, 44 * {@link Channel} channel, 45 * {@link ChannelBuffer} buf) throws Exception { 46 * 47 * if (buf.readableBytes() < 4) { 48 * return <strong>null</strong>; 49 * } 50 * 51 * buf.markReaderIndex(); 52 * int length = buf.readInt(); 53 * 54 * if (buf.readableBytes() < length) { 55 * buf.resetReaderIndex(); 56 * return <strong>null</strong>; 57 * } 58 * 59 * return buf.readBytes(length); 60 * } 61 * } 62 * </pre> 63 * is simplified like the following with {@link ReplayingDecoder}: 64 * <pre> 65 * public class IntegerHeaderFrameDecoder 66 * extends {@link ReplayingDecoder}<{@link VoidEnum}> { 67 * 68 * protected Object decode({@link ChannelHandlerContext} ctx, 69 * {@link Channel} channel, 70 * {@link ChannelBuffer} buf, 71 * {@link VoidEnum} state) throws Exception { 72 * 73 * return buf.readBytes(buf.readInt()); 74 * } 75 * } 76 * </pre> 77 * 78 * <h3>How does this work?</h3> 79 * <p> 80 * {@link ReplayingDecoder} passes a specialized {@link ChannelBuffer} 81 * implementation which throws an {@link Error} of certain type when there's not 82 * enough data in the buffer. In the {@code IntegerHeaderFrameDecoder} above, 83 * you just assumed that there will be 4 or more bytes in the buffer when 84 * you call {@code buf.readInt()}. If there's really 4 bytes in the buffer, 85 * it will return the integer header as you expected. Otherwise, the 86 * {@link Error} will be raised and the control will be returned to 87 * {@link ReplayingDecoder}. If {@link ReplayingDecoder} catches the 88 * {@link Error}, then it will rewind the {@code readerIndex} of the buffer 89 * back to the 'initial' position (i.e. the beginning of the buffer) and call 90 * the {@code decode(..)} method again when more data is received into the 91 * buffer. 92 * <p> 93 * Please note that {@link ReplayingDecoder} always throws the same cached 94 * {@link Error} instance to avoid the overhead of creating a new {@link Error} 95 * and filling its stack trace for every throw. 96 * 97 * <h3>Limitations</h3> 98 * <p> 99 * At the cost of the simplicity, {@link ReplayingDecoder} enforces you a few 100 * limitations: 101 * <ul> 102 * <li>Some buffer operations are prohibited.</li> 103 * <li>Performance can be worse if the network is slow and the message 104 * format is complicated unlike the example above. In this case, your 105 * decoder might have to decode the same part of the message over and over 106 * again.</li> 107 * <li>You must keep in mind that {@code decode(..)} method can be called many 108 * times to decode a single message. For example, the following code will 109 * not work: 110 * <pre> public class MyDecoder extends {@link ReplayingDecoder}<{@link VoidEnum}> { 111 * 112 * private final Queue<Integer> values = new LinkedList<Integer>(); 113 * 114 * {@code @Override} 115 * public Object decode(.., {@link ChannelBuffer} buffer, ..) throws Exception { 116 * 117 * // A message contains 2 integers. 118 * values.offer(buffer.readInt()); 119 * values.offer(buffer.readInt()); 120 * 121 * // This assertion will fail intermittently since values.offer() 122 * // can be called more than two times! 123 * assert values.size() == 2; 124 * return values.poll() + values.poll(); 125 * } 126 * }</pre> 127 * The correct implementation looks like the following, and you can also 128 * utilize the 'checkpoint' feature which is explained in detail in the 129 * next section. 130 * <pre> public class MyDecoder extends {@link ReplayingDecoder}<{@link VoidEnum}> { 131 * 132 * private final Queue<Integer> values = new LinkedList<Integer>(); 133 * 134 * {@code @Override} 135 * public Object decode(.., {@link ChannelBuffer} buffer, ..) throws Exception { 136 * 137 * // Revert the state of the variable that might have been changed 138 * // since the last partial decode. 139 * values.clear(); 140 * 141 * // A message contains 2 integers. 142 * values.offer(buffer.readInt()); 143 * values.offer(buffer.readInt()); 144 * 145 * // Now we know this assertion will never fail. 146 * assert values.size() == 2; 147 * return values.poll() + values.poll(); 148 * } 149 * }</pre> 150 * </li> 151 * </ul> 152 * 153 * <h3>Improving the performance</h3> 154 * <p> 155 * Fortunately, the performance of a complex decoder implementation can be 156 * improved significantly with the {@code checkpoint()} method. The 157 * {@code checkpoint()} method updates the 'initial' position of the buffer so 158 * that {@link ReplayingDecoder} rewinds the {@code readerIndex} of the buffer 159 * to the last position where you called the {@code checkpoint()} method. 160 * 161 * <h4>Calling {@code checkpoint(T)} with an {@link Enum}</h4> 162 * <p> 163 * Although you can just use {@code checkpoint()} method and manage the state 164 * of the decoder by yourself, the easiest way to manage the state of the 165 * decoder is to create an {@link Enum} type which represents the current state 166 * of the decoder and to call {@code checkpoint(T)} method whenever the state 167 * changes. You can have as many states as you want depending on the 168 * complexity of the message you want to decode: 169 * 170 * <pre> 171 * public enum MyDecoderState { 172 * READ_LENGTH, 173 * READ_CONTENT; 174 * } 175 * 176 * public class IntegerHeaderFrameDecoder 177 * extends {@link ReplayingDecoder}<<strong>MyDecoderState</strong>> { 178 * 179 * private int length; 180 * 181 * public IntegerHeaderFrameDecoder() { 182 * // Set the initial state. 183 * <strong>super(MyDecoderState.READ_LENGTH);</strong> 184 * } 185 * 186 * {@code @Override} 187 * protected Object decode({@link ChannelHandlerContext} ctx, 188 * {@link Channel} channel, 189 * {@link ChannelBuffer} buf, 190 * <b>MyDecoderState</b> state) throws Exception { 191 * switch (state) { 192 * case READ_LENGTH: 193 * length = buf.readInt(); 194 * <strong>checkpoint(MyDecoderState.READ_CONTENT);</strong> 195 * case READ_CONTENT: 196 * ChannelBuffer frame = buf.readBytes(length); 197 * <strong>checkpoint(MyDecoderState.READ_LENGTH);</strong> 198 * return frame; 199 * default: 200 * throw new Error("Shouldn't reach here."); 201 * } 202 * } 203 * } 204 * </pre> 205 * 206 * <h4>Calling {@code checkpoint()} with no parameter</h4> 207 * <p> 208 * An alternative way to manage the decoder state is to manage it by yourself. 209 * <pre> 210 * public class IntegerHeaderFrameDecoder 211 * extends {@link ReplayingDecoder}<<strong>{@link VoidEnum}</strong>> { 212 * 213 * <strong>private boolean readLength;</strong> 214 * private int length; 215 * 216 * {@code @Override} 217 * protected Object decode({@link ChannelHandlerContext} ctx, 218 * {@link Channel} channel, 219 * {@link ChannelBuffer} buf, 220 * {@link VoidEnum} state) throws Exception { 221 * if (!readLength) { 222 * length = buf.readInt(); 223 * <strong>readLength = true;</strong> 224 * <strong>checkpoint();</strong> 225 * } 226 * 227 * if (readLength) { 228 * ChannelBuffer frame = buf.readBytes(length); 229 * <strong>readLength = false;</strong> 230 * <strong>checkpoint();</strong> 231 * return frame; 232 * } 233 * } 234 * } 235 * </pre> 236 * 237 * <h3>Replacing a decoder with another decoder in a pipeline</h3> 238 * <p> 239 * If you are going to write a protocol multiplexer, you will probably want to 240 * replace a {@link ReplayingDecoder} (protocol detector) with another 241 * {@link ReplayingDecoder} or {@link FrameDecoder} (actual protocol decoder). 242 * It is not possible to achieve this simply by calling 243 * {@link ChannelPipeline#replace(ChannelHandler, String, ChannelHandler)}, but 244 * some additional steps are required: 245 * <pre> 246 * public class FirstDecoder extends {@link ReplayingDecoder}<{@link VoidEnum}> { 247 * 248 * public FirstDecoder() { 249 * super(true); // Enable unfold 250 * } 251 * 252 * {@code @Override} 253 * protected Object decode({@link ChannelHandlerContext} ctx, 254 * {@link Channel} ch, 255 * {@link ChannelBuffer} buf, 256 * {@link VoidEnum} state) { 257 * ... 258 * // Decode the first message 259 * Object firstMessage = ...; 260 * 261 * // Add the second decoder 262 * ctx.getPipeline().addLast("second", new SecondDecoder()); 263 * 264 * // Remove the first decoder (me) 265 * ctx.getPipeline().remove(this); 266 * 267 * if (buf.readable()) { 268 * // Hand off the remaining data to the second decoder 269 * return new Object[] { firstMessage, buf.readBytes(<b>super.actualReadableBytes()</b>) }; 270 * } else { 271 * // Nothing to hand off 272 * return firstMessage; 273 * } 274 * } 275 * </pre> 276 * 277 * @param <T> 278 * the state type; use {@link VoidEnum} if state management is unused 279 * 280 * @apiviz.landmark 281 * @apiviz.has org.jboss.netty.handler.codec.replay.UnreplayableOperationException oneway - - throws 282 */ 283 public abstract class ReplayingDecoder<T extends Enum<T>> 284 extends FrameDecoder { 285 286 287 private final ReplayingDecoderBuffer replayable = new ReplayingDecoderBuffer(this); 288 private T state; 289 private int checkpoint; 290 private boolean needsCleanup; 291 292 293 /** 294 * Creates a new instance with no initial state (i.e: {@code null}). 295 */ 296 protected ReplayingDecoder() { 297 this(null); 298 } 299 300 protected ReplayingDecoder(boolean unfold) { 301 this(null, unfold); 302 } 303 304 /** 305 * Creates a new instance with the specified initial state. 306 */ 307 protected ReplayingDecoder(T initialState) { 308 this(initialState, false); 309 } 310 311 protected ReplayingDecoder(T initialState, boolean unfold) { 312 super(unfold); 313 state = initialState; 314 } 315 316 @Override 317 protected ChannelBuffer internalBuffer() { 318 return super.internalBuffer(); 319 } 320 321 /** 322 * Stores the internal cumulative buffer's reader position. 323 */ 324 protected void checkpoint() { 325 ChannelBuffer cumulation = this.cumulation; 326 if (cumulation != null) { 327 checkpoint = cumulation.readerIndex(); 328 } else { 329 checkpoint = -1; // buffer not available (already cleaned up) 330 } 331 } 332 333 /** 334 * Stores the internal cumulative buffer's reader position and updates 335 * the current decoder state. 336 */ 337 protected void checkpoint(T state) { 338 checkpoint(); 339 setState(state); 340 } 341 342 /** 343 * Returns the current state of this decoder. 344 * @return the current state of this decoder 345 */ 346 protected T getState() { 347 return state; 348 } 349 350 /** 351 * Sets the current state of this decoder. 352 * @return the old state of this decoder 353 */ 354 protected T setState(T newState) { 355 T oldState = state; 356 state = newState; 357 return oldState; 358 } 359 360 /** 361 * Decodes the received packets so far into a frame. 362 * 363 * @param ctx the context of this handler 364 * @param channel the current channel 365 * @param buffer the cumulative buffer of received packets so far. 366 * Note that the buffer might be empty, which means you 367 * should not make an assumption that the buffer contains 368 * at least one byte in your decoder implementation. 369 * @param state the current decoder state ({@code null} if unused) 370 * 371 * @return the decoded frame 372 */ 373 protected abstract Object decode(ChannelHandlerContext ctx, 374 Channel channel, ChannelBuffer buffer, T state) throws Exception; 375 376 /** 377 * Decodes the received data so far into a frame when the channel is 378 * disconnected. 379 * 380 * @param ctx the context of this handler 381 * @param channel the current channel 382 * @param buffer the cumulative buffer of received packets so far. 383 * Note that the buffer might be empty, which means you 384 * should not make an assumption that the buffer contains 385 * at least one byte in your decoder implementation. 386 * @param state the current decoder state ({@code null} if unused) 387 * 388 * @return the decoded frame 389 */ 390 protected Object decodeLast( 391 ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer, T state) throws Exception { 392 return decode(ctx, channel, buffer, state); 393 } 394 395 /** 396 * Calls {@link #decode(ChannelHandlerContext, Channel, ChannelBuffer, Enum)}. This method 397 * should be never used by {@link ReplayingDecoder} itself. But to be safe we should handle it 398 * anyway 399 */ 400 @Override 401 protected final Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception { 402 return decode(ctx, channel, buffer, state); 403 } 404 405 @Override 406 protected final Object decodeLast( 407 ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception { 408 return decodeLast(ctx, channel, buffer, state); 409 } 410 411 @Override 412 public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) 413 throws Exception { 414 415 Object m = e.getMessage(); 416 if (!(m instanceof ChannelBuffer)) { 417 ctx.sendUpstream(e); 418 return; 419 } 420 421 ChannelBuffer input = (ChannelBuffer) m; 422 if (!input.readable()) { 423 return; 424 } 425 426 needsCleanup = true; 427 428 if (cumulation == null) { 429 // the cumulation buffer is not created yet so just pass the input 430 // to callDecode(...) method 431 cumulation = input; 432 433 int oldReaderIndex = input.readerIndex(); 434 int inputSize = input.readableBytes(); 435 436 try { 437 callDecode( 438 ctx, e.getChannel(), 439 input, replayable, 440 e.getRemoteAddress()); 441 } finally { 442 int readableBytes = input.readableBytes(); 443 if (readableBytes > 0) { 444 int inputCapacity = input.capacity(); 445 // check if readableBytes == capacity we can safe the copy as we will not be able to 446 // optimize memory usage anyway 447 boolean copy = 448 readableBytes != inputCapacity && 449 inputCapacity > getMaxCumulationBufferCapacity(); 450 451 // seems like there is something readable left in the input buffer 452 // or decoder wants a replay - create the cumulation buffer and 453 // copy the input into it 454 ChannelBuffer cumulation; 455 if (checkpoint > 0) { 456 int bytesToPreserve = inputSize - (checkpoint - oldReaderIndex); 457 if (copy) { 458 this.cumulation = cumulation = newCumulationBuffer(ctx, bytesToPreserve); 459 cumulation.writeBytes(input, checkpoint, bytesToPreserve); 460 } else { 461 this.cumulation = cumulation = input.slice(checkpoint, bytesToPreserve); 462 } 463 } else if (checkpoint == 0) { 464 if (copy) { 465 this.cumulation = cumulation = newCumulationBuffer(ctx, inputSize); 466 cumulation.writeBytes(input, oldReaderIndex, inputSize); 467 cumulation.readerIndex(input.readerIndex()); 468 } else { 469 this.cumulation = cumulation = input.slice(oldReaderIndex, inputSize); 470 cumulation.readerIndex(input.readerIndex()); 471 } 472 } else { 473 if (copy) { 474 this.cumulation = cumulation = newCumulationBuffer(ctx, input.readableBytes()); 475 cumulation.writeBytes(input); 476 } else { 477 this.cumulation = cumulation = input; 478 } 479 } 480 } else { 481 cumulation = null; 482 } 483 } 484 } else { 485 input = appendToCumulation(input); 486 try { 487 callDecode(ctx, e.getChannel(), input, replayable, e.getRemoteAddress()); 488 } finally { 489 updateCumulation(ctx, input); 490 } 491 } 492 } 493 494 private void callDecode( 495 ChannelHandlerContext context, Channel channel, 496 ChannelBuffer input, ChannelBuffer replayableInput, SocketAddress remoteAddress) throws Exception { 497 while (input.readable()) { 498 int oldReaderIndex = checkpoint = input.readerIndex(); 499 Object result = null; 500 T oldState = state; 501 try { 502 result = decode(context, channel, replayableInput, state); 503 if (result == null) { 504 if (oldReaderIndex == input.readerIndex() && oldState == state) { 505 throw new IllegalStateException( 506 "null cannot be returned if no data is consumed and state didn't change."); 507 } else { 508 // Previous data has been discarded or caused state transition. 509 // Probably it is reading on. 510 continue; 511 } 512 } 513 } catch (ReplayError replay) { 514 // Return to the checkpoint (or oldPosition) and retry. 515 int checkpoint = this.checkpoint; 516 if (checkpoint >= 0) { 517 input.readerIndex(checkpoint); 518 } else { 519 // Called by cleanup() - no need to maintain the readerIndex 520 // anymore because the buffer has been released already. 521 } 522 } 523 524 if (result == null) { 525 // Seems like more data is required. 526 // Let us wait for the next notification. 527 break; 528 } 529 530 if (oldReaderIndex == input.readerIndex() && oldState == state) { 531 throw new IllegalStateException( 532 "decode() method must consume at least one byte " + 533 "if it returned a decoded message (caused by: " + 534 getClass() + ')'); 535 } 536 537 // A successful decode 538 unfoldAndFireMessageReceived(context, remoteAddress, result); 539 } 540 } 541 542 @Override 543 protected void cleanup(ChannelHandlerContext ctx, ChannelStateEvent e) 544 throws Exception { 545 try { 546 ChannelBuffer cumulation = this.cumulation; 547 if (!needsCleanup) { 548 return; 549 } else { 550 needsCleanup = false; 551 } 552 553 replayable.terminate(); 554 555 if (cumulation != null && cumulation.readable()) { 556 // Make sure all data was read before notifying a closed channel. 557 callDecode(ctx, e.getChannel(), cumulation, replayable, null); 558 } 559 560 // Call decodeLast() finally. Please note that decodeLast() is 561 // called even if there's nothing more to read from the buffer to 562 // notify a user that the connection was closed explicitly. 563 Object partiallyDecoded = decodeLast(ctx, e.getChannel(), replayable, state); 564 565 this.cumulation = null; 566 567 if (partiallyDecoded != null) { 568 unfoldAndFireMessageReceived(ctx, null, partiallyDecoded); 569 } 570 } catch (ReplayError replay) { 571 // Ignore 572 } finally { 573 ctx.sendUpstream(e); 574 } 575 } 576 }