Class ReplayingDecoder<S>

  • Type Parameters:
    S - the state type which is usually an Enum; use Void if state management is unused
    All Implemented Interfaces:
    ChannelHandler, ChannelInboundHandler
    Direct Known Subclasses:
    CompatibleMarshallingDecoder, MqttDecoder, Socks4ClientDecoder, Socks4ServerDecoder, Socks5CommandRequestDecoder, Socks5CommandResponseDecoder, Socks5InitialRequestDecoder, Socks5InitialResponseDecoder, Socks5PasswordAuthRequestDecoder, Socks5PasswordAuthResponseDecoder, SocksAuthRequestDecoder, SocksAuthResponseDecoder, SocksCmdRequestDecoder, SocksCmdResponseDecoder, SocksInitRequestDecoder, SocksInitResponseDecoder, StompSubframeDecoder, WebSocket00FrameDecoder

    public abstract class ReplayingDecoder<S>
    extends ByteToMessageDecoder
    A specialized variation of ByteToMessageDecoder which enables implementation of a non-blocking decoder in the blocking I/O paradigm.

    The biggest difference between ReplayingDecoder and ByteToMessageDecoder is that ReplayingDecoder allows you to implement the decode() and decodeLast() methods just like all required bytes were received already, rather than checking the availability of the required bytes. For example, the following ByteToMessageDecoder implementation:

     public class IntegerHeaderFrameDecoder extends ByteToMessageDecoder {
    
       @Override
       protected void decode(ChannelHandlerContext ctx,
                               ByteBuf buf, List<Object> out) throws Exception {
    
         if (buf.readableBytes() < 4) {
            return;
         }
    
         buf.markReaderIndex();
         int length = buf.readInt();
    
         if (buf.readableBytes() < length) {
            buf.resetReaderIndex();
            return;
         }
    
         out.add(buf.readBytes(length));
       }
     }
     
    is simplified like the following with ReplayingDecoder:
     public class IntegerHeaderFrameDecoder
          extends ReplayingDecoder<Void> {
    
       protected void decode(ChannelHandlerContext ctx,
                               ByteBuf buf, List<Object> out) throws Exception {
    
         out.add(buf.readBytes(buf.readInt()));
       }
     }
     

    How does this work?

    ReplayingDecoder passes a specialized ByteBuf implementation which throws an Error of certain type when there's not enough data in the buffer. In the IntegerHeaderFrameDecoder above, you just assumed that there will be 4 or more bytes in the buffer when you call buf.readInt(). If there's really 4 bytes in the buffer, it will return the integer header as you expected. Otherwise, the Error will be raised and the control will be returned to ReplayingDecoder. If ReplayingDecoder catches the Error, then it will rewind the readerIndex of the buffer back to the 'initial' position (i.e. the beginning of the buffer) and call the decode(..) method again when more data is received into the buffer.

    Please note that ReplayingDecoder always throws the same cached Error instance to avoid the overhead of creating a new Error and filling its stack trace for every throw.

    Limitations

    At the cost of the simplicity, ReplayingDecoder enforces you a few limitations:

    • Some buffer operations are prohibited.
    • Performance can be worse if the network is slow and the message format is complicated unlike the example above. In this case, your decoder might have to decode the same part of the message over and over again.
    • You must keep in mind that decode(..) method can be called many times to decode a single message. For example, the following code will not work:
       public class MyDecoder extends ReplayingDecoder<Void> {
      
         private final Queue<Integer> values = new LinkedList<Integer>();
      
         @Override
         public void decode(.., ByteBuf buf, List<Object> out) throws Exception {
      
           // A message contains 2 integers.
           values.offer(buf.readInt());
           values.offer(buf.readInt());
      
           // This assertion will fail intermittently since values.offer()
           // can be called more than two times!
           assert values.size() == 2;
           out.add(values.poll() + values.poll());
         }
       }
      The correct implementation looks like the following, and you can also utilize the 'checkpoint' feature which is explained in detail in the next section.
       public class MyDecoder extends ReplayingDecoder<Void> {
      
         private final Queue<Integer> values = new LinkedList<Integer>();
      
         @Override
         public void decode(.., ByteBuf buf, List<Object> out) throws Exception {
      
           // Revert the state of the variable that might have been changed
           // since the last partial decode.
           values.clear();
      
           // A message contains 2 integers.
           values.offer(buf.readInt());
           values.offer(buf.readInt());
      
           // Now we know this assertion will never fail.
           assert values.size() == 2;
           out.add(values.poll() + values.poll());
         }
       }

    Improving the performance

    Fortunately, the performance of a complex decoder implementation can be improved significantly with the checkpoint() method. The checkpoint() method updates the 'initial' position of the buffer so that ReplayingDecoder rewinds the readerIndex of the buffer to the last position where you called the checkpoint() method.

    Calling checkpoint(T) with an Enum

    Although you can just use checkpoint() method and manage the state of the decoder by yourself, the easiest way to manage the state of the decoder is to create an Enum type which represents the current state of the decoder and to call checkpoint(T) method whenever the state changes. You can have as many states as you want depending on the complexity of the message you want to decode:

     public enum MyDecoderState {
       READ_LENGTH,
       READ_CONTENT;
     }
    
     public class IntegerHeaderFrameDecoder
          extends ReplayingDecoder<MyDecoderState> {
    
       private int length;
    
       public IntegerHeaderFrameDecoder() {
         // Set the initial state.
         super(MyDecoderState.READ_LENGTH);
       }
    
       @Override
       protected void decode(ChannelHandlerContext ctx,
                               ByteBuf buf, List<Object> out) throws Exception {
         switch (state()) {
         case READ_LENGTH:
           length = buf.readInt();
           checkpoint(MyDecoderState.READ_CONTENT);
         case READ_CONTENT:
           ByteBuf frame = buf.readBytes(length);
           checkpoint(MyDecoderState.READ_LENGTH);
           out.add(frame);
           break;
         default:
           throw new Error("Shouldn't reach here.");
         }
       }
     }
     

    Calling checkpoint() with no parameter

    An alternative way to manage the decoder state is to manage it by yourself.

     public class IntegerHeaderFrameDecoder
          extends ReplayingDecoder<Void> {
    
       private boolean readLength;
       private int length;
    
       @Override
       protected void decode(ChannelHandlerContext ctx,
                               ByteBuf buf, List<Object> out) throws Exception {
         if (!readLength) {
           length = buf.readInt();
           readLength = true;
           checkpoint();
         }
    
         if (readLength) {
           ByteBuf frame = buf.readBytes(length);
           readLength = false;
           checkpoint();
           out.add(frame);
         }
       }
     }
     

    Replacing a decoder with another decoder in a pipeline

    If you are going to write a protocol multiplexer, you will probably want to replace a ReplayingDecoder (protocol detector) with another ReplayingDecoder, ByteToMessageDecoder or MessageToMessageDecoder (actual protocol decoder). It is not possible to achieve this simply by calling ChannelPipeline.replace(ChannelHandler, String, ChannelHandler), but some additional steps are required:

     public class FirstDecoder extends ReplayingDecoder<Void> {
    
         @Override
         protected void decode(ChannelHandlerContext ctx,
                                 ByteBuf buf, List<Object> out) {
             ...
             // Decode the first message
             Object firstMessage = ...;
    
             // Add the second decoder
             ctx.pipeline().addLast("second", new SecondDecoder());
    
             if (buf.isReadable()) {
                 // Hand off the remaining data to the second decoder
                 out.add(firstMessage);
                 out.add(buf.readBytes(super.actualReadableBytes()));
             } else {
                 // Nothing to hand off
                 out.add(firstMessage);
             }
             // Remove the first decoder (me)
             ctx.pipeline().remove(this);
         }
     
    • Constructor Detail

      • ReplayingDecoder

        protected ReplayingDecoder()
        Creates a new instance with no initial state (i.e: null).
      • ReplayingDecoder

        protected ReplayingDecoder​(S initialState)
        Creates a new instance with the specified initial state.
    • Method Detail

      • checkpoint

        protected void checkpoint()
        Stores the internal cumulative buffer's reader position.
      • checkpoint

        protected void checkpoint​(S state)
        Stores the internal cumulative buffer's reader position and updates the current decoder state.
      • state

        protected S state()
        Returns the current state of this decoder.
        Returns:
        the current state of this decoder
      • state

        protected S state​(S newState)
        Sets the current state of this decoder.
        Returns:
        the old state of this decoder