View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.codec;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.Unpooled;
20  import io.netty.channel.ChannelHandler;
21  import io.netty.channel.ChannelHandlerContext;
22  import io.netty.channel.ChannelPipeline;
23  import io.netty.util.Signal;
24  import io.netty.util.internal.StringUtil;
25  
26  import java.util.List;
27  
28  /**
29   * A specialized variation of {@link ByteToMessageDecoder} which enables implementation
30   * of a non-blocking decoder in the blocking I/O paradigm.
31   * <p>
32   * The biggest difference between {@link ReplayingDecoder} and
33   * {@link ByteToMessageDecoder} is that {@link ReplayingDecoder} allows you to
34   * implement the {@code decode()} and {@code decodeLast()} methods just like
35   * all required bytes were received already, rather than checking the
36   * availability of the required bytes.  For example, the following
37   * {@link ByteToMessageDecoder} implementation:
38   * <pre>
39   * public class IntegerHeaderFrameDecoder extends {@link ByteToMessageDecoder} {
40   *
41   *   {@code @Override}
42   *   protected void decode({@link ChannelHandlerContext} ctx,
43   *                           {@link ByteBuf} buf, List&lt;Object&gt; out) throws Exception {
44   *
45   *     if (buf.readableBytes() &lt; 4) {
46   *        return;
47   *     }
48   *
49   *     buf.markReaderIndex();
50   *     int length = buf.readInt();
51   *
52   *     if (buf.readableBytes() &lt; length) {
53   *        buf.resetReaderIndex();
54   *        return;
55   *     }
56   *
57   *     out.add(buf.readBytes(length));
58   *   }
59   * }
60   * </pre>
61   * is simplified like the following with {@link ReplayingDecoder}:
62   * <pre>
63   * public class IntegerHeaderFrameDecoder
64   *      extends {@link ReplayingDecoder}&lt;{@link Void}&gt; {
65   *
66   *   protected void decode({@link ChannelHandlerContext} ctx,
67   *                           {@link ByteBuf} buf, List&lt;Object&gt; out) throws Exception {
68   *
69   *     out.add(buf.readBytes(buf.readInt()));
70   *   }
71   * }
72   * </pre>
73   *
74   * <h3>How does this work?</h3>
75   * <p>
76   * {@link ReplayingDecoder} passes a specialized {@link ByteBuf}
77   * implementation which throws an {@link Error} of certain type when there's not
78   * enough data in the buffer.  In the {@code IntegerHeaderFrameDecoder} above,
79   * you just assumed that there will be 4 or more bytes in the buffer when
80   * you call {@code buf.readInt()}.  If there's really 4 bytes in the buffer,
81   * it will return the integer header as you expected.  Otherwise, the
82   * {@link Error} will be raised and the control will be returned to
83   * {@link ReplayingDecoder}.  If {@link ReplayingDecoder} catches the
84   * {@link Error}, then it will rewind the {@code readerIndex} of the buffer
85   * back to the 'initial' position (i.e. the beginning of the buffer) and call
86   * the {@code decode(..)} method again when more data is received into the
87   * buffer.
88   * <p>
89   * Please note that {@link ReplayingDecoder} always throws the same cached
90   * {@link Error} instance to avoid the overhead of creating a new {@link Error}
91   * and filling its stack trace for every throw.
92   *
93   * <h3>Limitations</h3>
94   * <p>
95   * At the cost of the simplicity, {@link ReplayingDecoder} enforces you a few
96   * limitations:
97   * <ul>
98   * <li>Some buffer operations are prohibited.</li>
99   * <li>Performance can be worse if the network is slow and the message
100  *     format is complicated unlike the example above.  In this case, your
101  *     decoder might have to decode the same part of the message over and over
102  *     again.</li>
103  * <li>You must keep in mind that {@code decode(..)} method can be called many
104  *     times to decode a single message.  For example, the following code will
105  *     not work:
106  * <pre> public class MyDecoder extends {@link ReplayingDecoder}&lt;{@link Void}&gt; {
107  *
108  *   private final Queue&lt;Integer&gt; values = new LinkedList&lt;Integer&gt;();
109  *
110  *   {@code @Override}
111  *   public void decode(.., {@link ByteBuf} buf, List&lt;Object&gt; out) throws Exception {
112  *
113  *     // A message contains 2 integers.
114  *     values.offer(buf.readInt());
115  *     values.offer(buf.readInt());
116  *
117  *     // This assertion will fail intermittently since values.offer()
118  *     // can be called more than two times!
119  *     assert values.size() == 2;
120  *     out.add(values.poll() + values.poll());
121  *   }
122  * }</pre>
123  *      The correct implementation looks like the following, and you can also
124  *      utilize the 'checkpoint' feature which is explained in detail in the
125  *      next section.
126  * <pre> public class MyDecoder extends {@link ReplayingDecoder}&lt;{@link Void}&gt; {
127  *
128  *   private final Queue&lt;Integer&gt; values = new LinkedList&lt;Integer&gt;();
129  *
130  *   {@code @Override}
131  *   public void decode(.., {@link ByteBuf} buf, List&lt;Object&gt; out) throws Exception {
132  *
133  *     // Revert the state of the variable that might have been changed
134  *     // since the last partial decode.
135  *     values.clear();
136  *
137  *     // A message contains 2 integers.
138  *     values.offer(buf.readInt());
139  *     values.offer(buf.readInt());
140  *
141  *     // Now we know this assertion will never fail.
142  *     assert values.size() == 2;
143  *     out.add(values.poll() + values.poll());
144  *   }
145  * }</pre>
146  *     </li>
147  * </ul>
148  *
149  * <h3>Improving the performance</h3>
150  * <p>
151  * Fortunately, the performance of a complex decoder implementation can be
152  * improved significantly with the {@code checkpoint()} method.  The
153  * {@code checkpoint()} method updates the 'initial' position of the buffer so
154  * that {@link ReplayingDecoder} rewinds the {@code readerIndex} of the buffer
155  * to the last position where you called the {@code checkpoint()} method.
156  *
157  * <h4>Calling {@code checkpoint(T)} with an {@link Enum}</h4>
158  * <p>
159  * Although you can just use {@code checkpoint()} method and manage the state
160  * of the decoder by yourself, the easiest way to manage the state of the
161  * decoder is to create an {@link Enum} type which represents the current state
162  * of the decoder and to call {@code checkpoint(T)} method whenever the state
163  * changes.  You can have as many states as you want depending on the
164  * complexity of the message you want to decode:
165  *
166  * <pre>
167  * public enum MyDecoderState {
168  *   READ_LENGTH,
169  *   READ_CONTENT;
170  * }
171  *
172  * public class IntegerHeaderFrameDecoder
173  *      extends {@link ReplayingDecoder}&lt;<strong>MyDecoderState</strong>&gt; {
174  *
175  *   private int length;
176  *
177  *   public IntegerHeaderFrameDecoder() {
178  *     // Set the initial state.
179  *     <strong>super(MyDecoderState.READ_LENGTH);</strong>
180  *   }
181  *
182  *   {@code @Override}
183  *   protected void decode({@link ChannelHandlerContext} ctx,
184  *                           {@link ByteBuf} buf, List&lt;Object&gt; out) throws Exception {
185  *     switch (state()) {
186  *     case READ_LENGTH:
187  *       length = buf.readInt();
188  *       <strong>checkpoint(MyDecoderState.READ_CONTENT);</strong>
189  *     case READ_CONTENT:
190  *       ByteBuf frame = buf.readBytes(length);
191  *       <strong>checkpoint(MyDecoderState.READ_LENGTH);</strong>
192  *       out.add(frame);
193  *       break;
194  *     default:
195  *       throw new Error("Shouldn't reach here.");
196  *     }
197  *   }
198  * }
199  * </pre>
200  *
201  * <h4>Calling {@code checkpoint()} with no parameter</h4>
202  * <p>
203  * An alternative way to manage the decoder state is to manage it by yourself.
204  * <pre>
205  * public class IntegerHeaderFrameDecoder
206  *      extends {@link ReplayingDecoder}&lt;<strong>{@link Void}</strong>&gt; {
207  *
208  *   <strong>private boolean readLength;</strong>
209  *   private int length;
210  *
211  *   {@code @Override}
212  *   protected void decode({@link ChannelHandlerContext} ctx,
213  *                           {@link ByteBuf} buf, List&lt;Object&gt; out) throws Exception {
214  *     if (!readLength) {
215  *       length = buf.readInt();
216  *       <strong>readLength = true;</strong>
217  *       <strong>checkpoint();</strong>
218  *     }
219  *
220  *     if (readLength) {
221  *       ByteBuf frame = buf.readBytes(length);
222  *       <strong>readLength = false;</strong>
223  *       <strong>checkpoint();</strong>
224  *       out.add(frame);
225  *     }
226  *   }
227  * }
228  * </pre>
229  *
230  * <h3>Replacing a decoder with another decoder in a pipeline</h3>
231  * <p>
232  * If you are going to write a protocol multiplexer, you will probably want to
233  * replace a {@link ReplayingDecoder} (protocol detector) with another
234  * {@link ReplayingDecoder}, {@link ByteToMessageDecoder} or {@link MessageToMessageDecoder}
235  * (actual protocol decoder).
236  * It is not possible to achieve this simply by calling
237  * {@link ChannelPipeline#replace(ChannelHandler, String, ChannelHandler)}, but
238  * some additional steps are required:
239  * <pre>
240  * public class FirstDecoder extends {@link ReplayingDecoder}&lt;{@link Void}&gt; {
241  *
242  *     {@code @Override}
243  *     protected void decode({@link ChannelHandlerContext} ctx,
244  *                             {@link ByteBuf} buf, List&lt;Object&gt; out) {
245  *         ...
246  *         // Decode the first message
247  *         Object firstMessage = ...;
248  *
249  *         // Add the second decoder
250  *         ctx.pipeline().addLast("second", new SecondDecoder());
251  *
252  *         if (buf.isReadable()) {
253  *             // Hand off the remaining data to the second decoder
254  *             out.add(firstMessage);
255  *             out.add(buf.readBytes(<b>super.actualReadableBytes()</b>));
256  *         } else {
257  *             // Nothing to hand off
258  *             out.add(firstMessage);
259  *         }
260  *         // Remove the first decoder (me)
261  *         ctx.pipeline().remove(this);
262  *     }
263  * </pre>
264  * @param <S>
265  *        the state type which is usually an {@link Enum}; use {@link Void} if state management is
266  *        unused
267  */
268 public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder {
269 
270     static final Signal REPLAY = Signal.valueOf(ReplayingDecoder.class, "REPLAY");
271 
272     private final ReplayingDecoderByteBuf replayable = new ReplayingDecoderByteBuf();
273     private S state;
274     private int checkpoint = -1;
275 
276     /**
277      * Creates a new instance with no initial state (i.e: {@code null}).
278      */
279     protected ReplayingDecoder() {
280         this(null);
281     }
282 
283     /**
284      * Creates a new instance with the specified initial state.
285      */
286     protected ReplayingDecoder(S initialState) {
287         state = initialState;
288     }
289 
290     /**
291      * Stores the internal cumulative buffer's reader position.
292      */
293     protected void checkpoint() {
294         checkpoint = internalBuffer().readerIndex();
295     }
296 
297     /**
298      * Stores the internal cumulative buffer's reader position and updates
299      * the current decoder state.
300      */
301     protected void checkpoint(S state) {
302         checkpoint();
303         state(state);
304     }
305 
306     /**
307      * Returns the current state of this decoder.
308      * @return the current state of this decoder
309      */
310     protected S state() {
311         return state;
312     }
313 
314     /**
315      * Sets the current state of this decoder.
316      * @return the old state of this decoder
317      */
318     protected S state(S newState) {
319         S oldState = state;
320         state = newState;
321         return oldState;
322     }
323 
324     @Override
325     final void channelInputClosed(ChannelHandlerContext ctx, List<Object> out) throws Exception {
326         try {
327             replayable.terminate();
328             if (cumulation != null) {
329                 callDecode(ctx, internalBuffer(), out);
330             } else {
331                 replayable.setCumulation(Unpooled.EMPTY_BUFFER);
332             }
333             decodeLast(ctx, replayable, out);
334         } catch (Signal replay) {
335             // Ignore
336             replay.expect(REPLAY);
337         }
338     }
339 
340     @Override
341     protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
342         replayable.setCumulation(in);
343         try {
344             while (in.isReadable()) {
345                 int oldReaderIndex = checkpoint = in.readerIndex();
346                 int outSize = out.size();
347 
348                 if (outSize > 0) {
349                     fireChannelRead(ctx, out, outSize);
350                     out.clear();
351 
352                     // Check if this handler was removed before continuing with decoding.
353                     // If it was removed, it is not safe to continue to operate on the buffer.
354                     //
355                     // See:
356                     // - https://github.com/netty/netty/issues/4635
357                     if (ctx.isRemoved()) {
358                         break;
359                     }
360                     outSize = 0;
361                 }
362 
363                 S oldState = state;
364                 int oldInputLength = in.readableBytes();
365                 try {
366                     decodeRemovalReentryProtection(ctx, replayable, out);
367 
368                     // Check if this handler was removed before continuing the loop.
369                     // If it was removed, it is not safe to continue to operate on the buffer.
370                     //
371                     // See https://github.com/netty/netty/issues/1664
372                     if (ctx.isRemoved()) {
373                         break;
374                     }
375 
376                     if (outSize == out.size()) {
377                         if (oldInputLength == in.readableBytes() && oldState == state) {
378                             throw new DecoderException(
379                                     StringUtil.simpleClassName(getClass()) + ".decode() must consume the inbound " +
380                                     "data or change its state if it did not decode anything.");
381                         } else {
382                             // Previous data has been discarded or caused state transition.
383                             // Probably it is reading on.
384                             continue;
385                         }
386                     }
387                 } catch (Signal replay) {
388                     replay.expect(REPLAY);
389 
390                     // Check if this handler was removed before continuing the loop.
391                     // If it was removed, it is not safe to continue to operate on the buffer.
392                     //
393                     // See https://github.com/netty/netty/issues/1664
394                     if (ctx.isRemoved()) {
395                         break;
396                     }
397 
398                     // Return to the checkpoint (or oldPosition) and retry.
399                     int checkpoint = this.checkpoint;
400                     if (checkpoint >= 0) {
401                         in.readerIndex(checkpoint);
402                     } else {
403                         // Called by cleanup() - no need to maintain the readerIndex
404                         // anymore because the buffer has been released already.
405                     }
406                     break;
407                 }
408 
409                 if (oldReaderIndex == in.readerIndex() && oldState == state) {
410                     throw new DecoderException(
411                            StringUtil.simpleClassName(getClass()) + ".decode() method must consume the inbound data " +
412                            "or change its state if it decoded something.");
413                 }
414                 if (isSingleDecode()) {
415                     break;
416                 }
417             }
418         } catch (DecoderException e) {
419             throw e;
420         } catch (Exception cause) {
421             throw new DecoderException(cause);
422         }
423     }
424 }