Class ReplayingDecoder<S>
- java.lang.Object
-
- io.netty.channel.ChannelHandlerAdapter
-
- io.netty.channel.ChannelInboundHandlerAdapter
-
- io.netty.handler.codec.ByteToMessageDecoder
-
- io.netty.handler.codec.ReplayingDecoder<S>
-
- Type Parameters:
S- the state type which is usually anEnum; useVoidif state management is unused
- All Implemented Interfaces:
ChannelHandler,ChannelInboundHandler
- Direct Known Subclasses:
CompatibleMarshallingDecoder,MqttDecoder,Socks4ClientDecoder,Socks4ServerDecoder,Socks5CommandRequestDecoder,Socks5CommandResponseDecoder,Socks5InitialRequestDecoder,Socks5InitialResponseDecoder,Socks5PasswordAuthRequestDecoder,Socks5PasswordAuthResponseDecoder,SocksAuthRequestDecoder,SocksAuthResponseDecoder,SocksCmdRequestDecoder,SocksCmdResponseDecoder,SocksInitRequestDecoder,SocksInitResponseDecoder,StompSubframeDecoder,WebSocket00FrameDecoder
public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder
A specialized variation ofByteToMessageDecoderwhich enables implementation of a non-blocking decoder in the blocking I/O paradigm.The biggest difference between
ReplayingDecoderandByteToMessageDecoderis thatReplayingDecoderallows you to implement thedecode()anddecodeLast()methods just like all required bytes were received already, rather than checking the availability of the required bytes. For example, the followingByteToMessageDecoderimplementation:public class IntegerHeaderFrameDecoder extends
is simplified like the following withByteToMessageDecoder{@Overrideprotected void decode(ChannelHandlerContextctx,ByteBufbuf, List<Object> out) throws Exception { if (buf.readableBytes() < 4) { return; } buf.markReaderIndex(); int length = buf.readInt(); if (buf.readableBytes() < length) { buf.resetReaderIndex(); return; } out.add(buf.readBytes(length)); } }ReplayingDecoder:public class IntegerHeaderFrameDecoder extendsReplayingDecoder<Void> { protected void decode(ChannelHandlerContextctx,ByteBufbuf, List<Object> out) throws Exception { out.add(buf.readBytes(buf.readInt())); } }How does this work?
ReplayingDecoderpasses a specializedByteBufimplementation which throws anErrorof certain type when there's not enough data in the buffer. In theIntegerHeaderFrameDecoderabove, you just assumed that there will be 4 or more bytes in the buffer when you callbuf.readInt(). If there's really 4 bytes in the buffer, it will return the integer header as you expected. Otherwise, theErrorwill be raised and the control will be returned toReplayingDecoder. IfReplayingDecodercatches theError, then it will rewind thereaderIndexof the buffer back to the 'initial' position (i.e. the beginning of the buffer) and call thedecode(..)method again when more data is received into the buffer.Please note that
ReplayingDecoderalways throws the same cachedErrorinstance to avoid the overhead of creating a newErrorand filling its stack trace for every throw.Limitations
At the cost of the simplicity,
ReplayingDecoderenforces you a few limitations:- Some buffer operations are prohibited.
- Performance can be worse if the network is slow and the message format is complicated unlike the example above. In this case, your decoder might have to decode the same part of the message over and over again.
- You must keep in mind that
decode(..)method can be called many times to decode a single message. For example, the following code will not work:public class MyDecoder extends
The correct implementation looks like the following, and you can also utilize the 'checkpoint' feature which is explained in detail in the next section.ReplayingDecoder<Void> { private final Queue<Integer> values = new LinkedList<Integer>();@Overridepublic void decode(..,ByteBufbuf, List<Object> out) throws Exception { // A message contains 2 integers. values.offer(buf.readInt()); values.offer(buf.readInt()); // This assertion will fail intermittently since values.offer() // can be called more than two times! assert values.size() == 2; out.add(values.poll() + values.poll()); } }public class MyDecoder extends
ReplayingDecoder<Void> { private final Queue<Integer> values = new LinkedList<Integer>();@Overridepublic void decode(..,ByteBufbuf, List<Object> out) throws Exception { // Revert the state of the variable that might have been changed // since the last partial decode. values.clear(); // A message contains 2 integers. values.offer(buf.readInt()); values.offer(buf.readInt()); // Now we know this assertion will never fail. assert values.size() == 2; out.add(values.poll() + values.poll()); } }
Improving the performance
Fortunately, the performance of a complex decoder implementation can be improved significantly with the
checkpoint()method. Thecheckpoint()method updates the 'initial' position of the buffer so thatReplayingDecoderrewinds thereaderIndexof the buffer to the last position where you called thecheckpoint()method.Calling
checkpoint(T)with anEnumAlthough you can just use
checkpoint()method and manage the state of the decoder by yourself, the easiest way to manage the state of the decoder is to create anEnumtype which represents the current state of the decoder and to callcheckpoint(T)method whenever the state changes. You can have as many states as you want depending on the complexity of the message you want to decode:public enum MyDecoderState { READ_LENGTH, READ_CONTENT; } public class IntegerHeaderFrameDecoder extendsReplayingDecoder<MyDecoderState> { private int length; public IntegerHeaderFrameDecoder() { // Set the initial state. super(MyDecoderState.READ_LENGTH); }@Overrideprotected void decode(ChannelHandlerContextctx,ByteBufbuf, List<Object> out) throws Exception { switch (state()) { case READ_LENGTH: length = buf.readInt(); checkpoint(MyDecoderState.READ_CONTENT); case READ_CONTENT: ByteBuf frame = buf.readBytes(length); checkpoint(MyDecoderState.READ_LENGTH); out.add(frame); break; default: throw new Error("Shouldn't reach here."); } } }Calling
checkpoint()with no parameterAn alternative way to manage the decoder state is to manage it by yourself.
public class IntegerHeaderFrameDecoder extendsReplayingDecoder<Void> { private boolean readLength; private int length;@Overrideprotected void decode(ChannelHandlerContextctx,ByteBufbuf, List<Object> out) throws Exception { if (!readLength) { length = buf.readInt(); readLength = true; checkpoint(); } if (readLength) { ByteBuf frame = buf.readBytes(length); readLength = false; checkpoint(); out.add(frame); } } }Replacing a decoder with another decoder in a pipeline
If you are going to write a protocol multiplexer, you will probably want to replace a
ReplayingDecoder(protocol detector) with anotherReplayingDecoder,ByteToMessageDecoderorMessageToMessageDecoder(actual protocol decoder). It is not possible to achieve this simply by callingChannelPipeline.replace(ChannelHandler, String, ChannelHandler), but some additional steps are required:public class FirstDecoder extends
ReplayingDecoder<Void> {@Overrideprotected void decode(ChannelHandlerContextctx,ByteBufbuf, List<Object> out) { ... // Decode the first message Object firstMessage = ...; // Add the second decoder ctx.pipeline().addLast("second", new SecondDecoder()); if (buf.isReadable()) { // Hand off the remaining data to the second decoder out.add(firstMessage); out.add(buf.readBytes(super.actualReadableBytes())); } else { // Nothing to hand off out.add(firstMessage); } // Remove the first decoder (me) ctx.pipeline().remove(this); }
-
-
Nested Class Summary
-
Nested classes/interfaces inherited from class io.netty.handler.codec.ByteToMessageDecoder
ByteToMessageDecoder.Cumulator
-
Nested classes/interfaces inherited from interface io.netty.channel.ChannelHandler
ChannelHandler.Sharable
-
-
Field Summary
-
Fields inherited from class io.netty.handler.codec.ByteToMessageDecoder
COMPOSITE_CUMULATOR, MERGE_CUMULATOR
-
-
Constructor Summary
Constructors Modifier Constructor Description protectedReplayingDecoder()Creates a new instance with no initial state (i.e:null).protectedReplayingDecoder(S initialState)Creates a new instance with the specified initial state.
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description protected voidcallDecode(ChannelHandlerContext ctx, ByteBuf in, java.util.List<java.lang.Object> out)Called once data should be decoded from the givenByteBuf.protected voidcheckpoint()Stores the internal cumulative buffer's reader position.protected voidcheckpoint(S state)Stores the internal cumulative buffer's reader position and updates the current decoder state.protected Sstate()Returns the current state of this decoder.protected Sstate(S newState)Sets the current state of this decoder.-
Methods inherited from class io.netty.handler.codec.ByteToMessageDecoder
actualReadableBytes, channelInactive, channelRead, channelReadComplete, decode, decodeLast, discardSomeReadBytes, handlerRemoved, handlerRemoved0, internalBuffer, isSingleDecode, setCumulator, setDiscardAfterReads, setSingleDecode, userEventTriggered
-
Methods inherited from class io.netty.channel.ChannelInboundHandlerAdapter
channelActive, channelRegistered, channelUnregistered, channelWritabilityChanged, exceptionCaught
-
Methods inherited from class io.netty.channel.ChannelHandlerAdapter
ensureNotSharable, handlerAdded, isSharable
-
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
-
Methods inherited from interface io.netty.channel.ChannelHandler
handlerAdded
-
-
-
-
Constructor Detail
-
ReplayingDecoder
protected ReplayingDecoder()
Creates a new instance with no initial state (i.e:null).
-
ReplayingDecoder
protected ReplayingDecoder(S initialState)
Creates a new instance with the specified initial state.
-
-
Method Detail
-
checkpoint
protected void checkpoint()
Stores the internal cumulative buffer's reader position.
-
checkpoint
protected void checkpoint(S state)
Stores the internal cumulative buffer's reader position and updates the current decoder state.
-
state
protected S state()
Returns the current state of this decoder.- Returns:
- the current state of this decoder
-
state
protected S state(S newState)
Sets the current state of this decoder.- Returns:
- the old state of this decoder
-
callDecode
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, java.util.List<java.lang.Object> out)
Description copied from class:ByteToMessageDecoderCalled once data should be decoded from the givenByteBuf. This method will callByteToMessageDecoder.decode(ChannelHandlerContext, ByteBuf, List)as long as decoding should take place.- Overrides:
callDecodein classByteToMessageDecoder- Parameters:
ctx- theChannelHandlerContextwhich thisByteToMessageDecoderbelongs toin- theByteBuffrom which to read dataout- theListto which decoded messages should be added
-
-