View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.handler.codec.replay;
17  
18  import org.jboss.netty.buffer.ChannelBuffer;
19  import org.jboss.netty.channel.Channel;
20  import org.jboss.netty.channel.ChannelHandler;
21  import org.jboss.netty.channel.ChannelHandlerContext;
22  import org.jboss.netty.channel.ChannelPipeline;
23  import org.jboss.netty.channel.ChannelStateEvent;
24  import org.jboss.netty.channel.MessageEvent;
25  import org.jboss.netty.handler.codec.frame.FrameDecoder;
26  
27  import java.net.SocketAddress;
28  
29  /**
30   * A specialized variation of {@link FrameDecoder} which enables implementation
31   * of a non-blocking decoder in the blocking I/O paradigm.
32   * <p>
33   * The biggest difference between {@link ReplayingDecoder} and
34   * {@link FrameDecoder} is that {@link ReplayingDecoder} allows you to
35   * implement the {@code decode()} and {@code decodeLast()} methods just like
36   * all required bytes were received already, rather than checking the
37   * availability of the required bytes.  For example, the following
38   * {@link FrameDecoder} implementation:
39   * <pre>
40   * public class IntegerHeaderFrameDecoder extends {@link FrameDecoder} {
41   *
42   *   {@code @Override}
43   *   protected Object decode({@link ChannelHandlerContext} ctx,
44   *                           {@link Channel} channel,
45   *                           {@link ChannelBuffer} buf) throws Exception {
46   *
47   *     if (buf.readableBytes() &lt; 4) {
48   *        return <strong>null</strong>;
49   *     }
50   *
51   *     buf.markReaderIndex();
52   *     int length = buf.readInt();
53   *
54   *     if (buf.readableBytes() &lt; length) {
55   *        buf.resetReaderIndex();
56   *        return <strong>null</strong>;
57   *     }
58   *
59   *     return buf.readBytes(length);
60   *   }
61   * }
62   * </pre>
63   * is simplified like the following with {@link ReplayingDecoder}:
64   * <pre>
65   * public class IntegerHeaderFrameDecoder
66   *      extends {@link ReplayingDecoder}&lt;{@link VoidEnum}&gt; {
67   *
68   *   protected Object decode({@link ChannelHandlerContext} ctx,
69   *                           {@link Channel} channel,
70   *                           {@link ChannelBuffer} buf,
71   *                           {@link VoidEnum} state) throws Exception {
72   *
73   *     return buf.readBytes(buf.readInt());
74   *   }
75   * }
76   * </pre>
77   *
78   * <h3>How does this work?</h3>
79   * <p>
80   * {@link ReplayingDecoder} passes a specialized {@link ChannelBuffer}
81   * implementation which throws an {@link Error} of certain type when there's not
82   * enough data in the buffer.  In the {@code IntegerHeaderFrameDecoder} above,
83   * you just assumed that there will be 4 or more bytes in the buffer when
84   * you call {@code buf.readInt()}.  If there's really 4 bytes in the buffer,
85   * it will return the integer header as you expected.  Otherwise, the
86   * {@link Error} will be raised and the control will be returned to
87   * {@link ReplayingDecoder}.  If {@link ReplayingDecoder} catches the
88   * {@link Error}, then it will rewind the {@code readerIndex} of the buffer
89   * back to the 'initial' position (i.e. the beginning of the buffer) and call
90   * the {@code decode(..)} method again when more data is received into the
91   * buffer.
92   * <p>
93   * Please note that {@link ReplayingDecoder} always throws the same cached
94   * {@link Error} instance to avoid the overhead of creating a new {@link Error}
95   * and filling its stack trace for every throw.
96   *
97   * <h3>Limitations</h3>
98   * <p>
99   * At the cost of the simplicity, {@link ReplayingDecoder} enforces you a few
100  * limitations:
101  * <ul>
102  * <li>Some buffer operations are prohibited.</li>
103  * <li>Performance can be worse if the network is slow and the message
104  *     format is complicated unlike the example above.  In this case, your
105  *     decoder might have to decode the same part of the message over and over
106  *     again.</li>
107  * <li>You must keep in mind that {@code decode(..)} method can be called many
108  *     times to decode a single message.  For example, the following code will
109  *     not work:
110  * <pre> public class MyDecoder extends {@link ReplayingDecoder}&lt;{@link VoidEnum}&gt; {
111  *
112  *   private final Queue&lt;Integer&gt; values = new LinkedList&lt;Integer&gt;();
113  *
114  *   {@code @Override}
115  *   public Object decode(.., {@link ChannelBuffer} buffer, ..) throws Exception {
116  *
117  *     // A message contains 2 integers.
118  *     values.offer(buffer.readInt());
119  *     values.offer(buffer.readInt());
120  *
121  *     // This assertion will fail intermittently since values.offer()
122  *     // can be called more than two times!
123  *     assert values.size() == 2;
124  *     return values.poll() + values.poll();
125  *   }
126  * }</pre>
127  *      The correct implementation looks like the following, and you can also
128  *      utilize the 'checkpoint' feature which is explained in detail in the
129  *      next section.
130  * <pre> public class MyDecoder extends {@link ReplayingDecoder}&lt;{@link VoidEnum}&gt; {
131  *
132  *   private final Queue&lt;Integer&gt; values = new LinkedList&lt;Integer&gt;();
133  *
134  *   {@code @Override}
135  *   public Object decode(.., {@link ChannelBuffer} buffer, ..) throws Exception {
136  *
137  *     // Revert the state of the variable that might have been changed
138  *     // since the last partial decode.
139  *     values.clear();
140  *
141  *     // A message contains 2 integers.
142  *     values.offer(buffer.readInt());
143  *     values.offer(buffer.readInt());
144  *
145  *     // Now we know this assertion will never fail.
146  *     assert values.size() == 2;
147  *     return values.poll() + values.poll();
148  *   }
149  * }</pre>
150  *     </li>
151  * </ul>
152  *
153  * <h3>Improving the performance</h3>
154  * <p>
155  * Fortunately, the performance of a complex decoder implementation can be
156  * improved significantly with the {@code checkpoint()} method.  The
157  * {@code checkpoint()} method updates the 'initial' position of the buffer so
158  * that {@link ReplayingDecoder} rewinds the {@code readerIndex} of the buffer
159  * to the last position where you called the {@code checkpoint()} method.
160  *
161  * <h4>Calling {@code checkpoint(T)} with an {@link Enum}</h4>
162  * <p>
163  * Although you can just use {@code checkpoint()} method and manage the state
164  * of the decoder by yourself, the easiest way to manage the state of the
165  * decoder is to create an {@link Enum} type which represents the current state
166  * of the decoder and to call {@code checkpoint(T)} method whenever the state
167  * changes.  You can have as many states as you want depending on the
168  * complexity of the message you want to decode:
169  *
170  * <pre>
171  * public enum MyDecoderState {
172  *   READ_LENGTH,
173  *   READ_CONTENT;
174  * }
175  *
176  * public class IntegerHeaderFrameDecoder
177  *      extends {@link ReplayingDecoder}&lt;<strong>MyDecoderState</strong>&gt; {
178  *
179  *   private int length;
180  *
181  *   public IntegerHeaderFrameDecoder() {
182  *     // Set the initial state.
183  *     <strong>super(MyDecoderState.READ_LENGTH);</strong>
184  *   }
185  *
186  *   {@code @Override}
187  *   protected Object decode({@link ChannelHandlerContext} ctx,
188  *                           {@link Channel} channel,
189  *                           {@link ChannelBuffer} buf,
190  *                           <b>MyDecoderState</b> state) throws Exception {
191  *     switch (state) {
192  *     case READ_LENGTH:
193  *       length = buf.readInt();
194  *       <strong>checkpoint(MyDecoderState.READ_CONTENT);</strong>
195  *     case READ_CONTENT:
196  *       ChannelBuffer frame = buf.readBytes(length);
197  *       <strong>checkpoint(MyDecoderState.READ_LENGTH);</strong>
198  *       return frame;
199  *     default:
200  *       throw new Error("Shouldn't reach here.");
201  *     }
202  *   }
203  * }
204  * </pre>
205  *
206  * <h4>Calling {@code checkpoint()} with no parameter</h4>
207  * <p>
208  * An alternative way to manage the decoder state is to manage it by yourself.
209  * <pre>
210  * public class IntegerHeaderFrameDecoder
211  *      extends {@link ReplayingDecoder}&lt;<strong>{@link VoidEnum}</strong>&gt; {
212  *
213  *   <strong>private boolean readLength;</strong>
214  *   private int length;
215  *
216  *   {@code @Override}
217  *   protected Object decode({@link ChannelHandlerContext} ctx,
218  *                           {@link Channel} channel,
219  *                           {@link ChannelBuffer} buf,
220  *                           {@link VoidEnum} state) throws Exception {
221  *     if (!readLength) {
222  *       length = buf.readInt();
223  *       <strong>readLength = true;</strong>
224  *       <strong>checkpoint();</strong>
225  *     }
226  *
227  *     if (readLength) {
228  *       ChannelBuffer frame = buf.readBytes(length);
229  *       <strong>readLength = false;</strong>
230  *       <strong>checkpoint();</strong>
231  *       return frame;
232  *     }
233  *   }
234  * }
235  * </pre>
236  *
237  * <h3>Replacing a decoder with another decoder in a pipeline</h3>
238  * <p>
239  * If you are going to write a protocol multiplexer, you will probably want to
240  * replace a {@link ReplayingDecoder} (protocol detector) with another
241  * {@link ReplayingDecoder} or {@link FrameDecoder} (actual protocol decoder).
242  * It is not possible to achieve this simply by calling
243  * {@link ChannelPipeline#replace(ChannelHandler, String, ChannelHandler)}, but
244  * some additional steps are required:
245  * <pre>
246  * public class FirstDecoder extends {@link ReplayingDecoder}&lt;{@link VoidEnum}&gt; {
247  *
248  *     public FirstDecoder() {
249  *         super(true); // Enable unfold
250  *     }
251  *
252  *     {@code @Override}
253  *     protected Object decode({@link ChannelHandlerContext} ctx,
254  *                             {@link Channel} ch,
255  *                             {@link ChannelBuffer} buf,
256  *                             {@link VoidEnum} state) {
257  *         ...
258  *         // Decode the first message
259  *         Object firstMessage = ...;
260  *
261  *         // Add the second decoder
262  *         ctx.getPipeline().addLast("second", new SecondDecoder());
263  *
264  *         // Remove the first decoder (me)
265  *         ctx.getPipeline().remove(this);
266  *
267  *         if (buf.readable()) {
268  *             // Hand off the remaining data to the second decoder
269  *             return new Object[] { firstMessage, buf.readBytes(<b>super.actualReadableBytes()</b>) };
270  *         } else {
271  *             // Nothing to hand off
272  *             return firstMessage;
273  *         }
274  *     }
275  * </pre>
276  *
277  * @param <T>
278  *        the state type; use {@link VoidEnum} if state management is unused
279  *
280  * @apiviz.landmark
281  * @apiviz.has org.jboss.netty.handler.codec.replay.UnreplayableOperationException oneway - - throws
282  */
283 public abstract class ReplayingDecoder<T extends Enum<T>>
284         extends FrameDecoder {
285 
286     private final ReplayingDecoderBuffer replayable = new ReplayingDecoderBuffer(this);
287     private T state;
288     private int checkpoint;
289     private boolean needsCleanup;
290 
291     /**
292      * Creates a new instance with no initial state (i.e: {@code null}).
293      */
294     protected ReplayingDecoder() {
295         this(null);
296     }
297 
298     protected ReplayingDecoder(boolean unfold) {
299         this(null, unfold);
300     }
301 
302     /**
303      * Creates a new instance with the specified initial state.
304      */
305     protected ReplayingDecoder(T initialState) {
306         this(initialState, false);
307     }
308 
309     protected ReplayingDecoder(T initialState, boolean unfold) {
310         super(unfold);
311         state = initialState;
312     }
313 
314     @Override
315     protected ChannelBuffer internalBuffer() {
316         return super.internalBuffer();
317     }
318 
319     /**
320      * Stores the internal cumulative buffer's reader position.
321      */
322     protected void checkpoint() {
323         ChannelBuffer cumulation = this.cumulation;
324         if (cumulation != null) {
325             checkpoint = cumulation.readerIndex();
326         } else {
327             checkpoint = -1; // buffer not available (already cleaned up)
328         }
329     }
330 
331     /**
332      * Stores the internal cumulative buffer's reader position and updates
333      * the current decoder state.
334      */
335     protected void checkpoint(T state) {
336         checkpoint();
337         setState(state);
338     }
339 
340     /**
341      * Returns the current state of this decoder.
342      * @return the current state of this decoder
343      */
344     protected T getState() {
345         return state;
346     }
347 
348     /**
349      * Sets the current state of this decoder.
350      * @return the old state of this decoder
351      */
352     protected T setState(T newState) {
353         T oldState = state;
354         state = newState;
355         return oldState;
356     }
357 
358     /**
359      * Decodes the received packets so far into a frame.
360      *
361      * @param ctx      the context of this handler
362      * @param channel  the current channel
363      * @param buffer   the cumulative buffer of received packets so far.
364      *                 Note that the buffer might be empty, which means you
365      *                 should not make an assumption that the buffer contains
366      *                 at least one byte in your decoder implementation.
367      * @param state    the current decoder state ({@code null} if unused)
368      *
369      * @return the decoded frame
370      */
371     protected abstract Object decode(ChannelHandlerContext ctx,
372             Channel channel, ChannelBuffer buffer, T state) throws Exception;
373 
374     /**
375      * Decodes the received data so far into a frame when the channel is
376      * disconnected.
377      *
378      * @param ctx      the context of this handler
379      * @param channel  the current channel
380      * @param buffer   the cumulative buffer of received packets so far.
381      *                 Note that the buffer might be empty, which means you
382      *                 should not make an assumption that the buffer contains
383      *                 at least one byte in your decoder implementation.
384      * @param state    the current decoder state ({@code null} if unused)
385      *
386      * @return the decoded frame
387      */
388     protected Object decodeLast(
389             ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer, T state) throws Exception {
390         return decode(ctx, channel, buffer, state);
391     }
392 
393     /**
394      * Calls {@link #decode(ChannelHandlerContext, Channel, ChannelBuffer, Enum)}. This method
395      * should be never used by {@link ReplayingDecoder} itself.  But to be safe we should handle it
396      * anyway
397      */
398     @Override
399     protected final Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
400         return decode(ctx, channel, buffer, state);
401     }
402 
403     @Override
404     protected final Object decodeLast(
405             ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
406         return decodeLast(ctx, channel, buffer, state);
407     }
408 
409     @Override
410     public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
411             throws Exception {
412 
413         Object m = e.getMessage();
414         if (!(m instanceof ChannelBuffer)) {
415             ctx.sendUpstream(e);
416             return;
417         }
418 
419         ChannelBuffer input = (ChannelBuffer) m;
420         if (!input.readable()) {
421             return;
422         }
423 
424         needsCleanup = true;
425 
426         if (cumulation == null) {
427             // the cumulation buffer is not created yet so just pass the input
428             // to callDecode(...) method
429             cumulation = input;
430 
431             int oldReaderIndex = input.readerIndex();
432             int inputSize = input.readableBytes();
433 
434             try {
435                 callDecode(
436                         ctx, e.getChannel(),
437                         input, replayable,
438                         e.getRemoteAddress());
439             } finally {
440                 int readableBytes = input.readableBytes();
441                 if (readableBytes > 0) {
442                     int inputCapacity = input.capacity();
443                     // check if readableBytes == capacity we can safe the copy as we will not be able to
444                     // optimize memory usage anyway
445                     boolean copy =
446                             readableBytes != inputCapacity &&
447                             inputCapacity > getMaxCumulationBufferCapacity();
448 
449                     // seems like there is something readable left in the input buffer
450                     // or decoder wants a replay - create the cumulation buffer and
451                     // copy the input into it
452                     ChannelBuffer cumulation;
453                     if (checkpoint > 0) {
454                         int bytesToPreserve = inputSize - (checkpoint - oldReaderIndex);
455                         if (copy) {
456                             this.cumulation = cumulation = newCumulationBuffer(ctx, bytesToPreserve);
457                             cumulation.writeBytes(input, checkpoint, bytesToPreserve);
458                         } else {
459                             this.cumulation = input.slice(checkpoint, bytesToPreserve);
460                         }
461                     } else if (checkpoint == 0) {
462                         if (copy) {
463                             this.cumulation = cumulation = newCumulationBuffer(ctx, inputSize);
464                             cumulation.writeBytes(input, oldReaderIndex, inputSize);
465                             cumulation.readerIndex(input.readerIndex());
466                         } else {
467                             this.cumulation = cumulation = input.slice(oldReaderIndex, inputSize);
468                             cumulation.readerIndex(input.readerIndex());
469                         }
470                     } else {
471                         if (copy) {
472                             this.cumulation = cumulation = newCumulationBuffer(ctx, input.readableBytes());
473                             cumulation.writeBytes(input);
474                         } else {
475                             this.cumulation = input;
476                         }
477                     }
478                 } else {
479                     cumulation = null;
480                 }
481             }
482         } else {
483             input = appendToCumulation(input);
484             try {
485                 callDecode(ctx, e.getChannel(), input, replayable, e.getRemoteAddress());
486             } finally {
487                 updateCumulation(ctx, input);
488             }
489         }
490     }
491 
492     private void callDecode(
493             ChannelHandlerContext context, Channel channel,
494             ChannelBuffer input, ChannelBuffer replayableInput, SocketAddress remoteAddress) throws Exception {
495         while (input.readable()) {
496             int oldReaderIndex = checkpoint = input.readerIndex();
497             Object result = null;
498             T oldState = state;
499             try {
500                 result = decode(context, channel, replayableInput, state);
501                 if (result == null) {
502                     if (oldReaderIndex == input.readerIndex() && oldState == state) {
503                         throw new IllegalStateException(
504                                 "null cannot be returned if no data is consumed and state didn't change.");
505                     } else {
506                         // Previous data has been discarded or caused state transition.
507                         // Probably it is reading on.
508                         continue;
509                     }
510                 }
511             } catch (ReplayError replay) {
512                 // Return to the checkpoint (or oldPosition) and retry.
513                 int checkpoint = this.checkpoint;
514                 if (checkpoint >= 0) {
515                     input.readerIndex(checkpoint);
516                 } else {
517                     // Called by cleanup() - no need to maintain the readerIndex
518                     // anymore because the buffer has been released already.
519                 }
520             }
521 
522             if (result == null) {
523                 // Seems like more data is required.
524                 // Let us wait for the next notification.
525                 break;
526             }
527 
528             if (oldReaderIndex == input.readerIndex() && oldState == state) {
529                 throw new IllegalStateException(
530                         "decode() method must consume at least one byte " +
531                         "if it returned a decoded message (caused by: " +
532                         getClass() + ')');
533             }
534 
535             // A successful decode
536             unfoldAndFireMessageReceived(context, remoteAddress, result);
537         }
538     }
539 
540     @Override
541     protected void cleanup(ChannelHandlerContext ctx, ChannelStateEvent e)
542             throws Exception {
543         try {
544             ChannelBuffer cumulation = this.cumulation;
545             if (!needsCleanup) {
546                 return;
547             }
548 
549             needsCleanup = false;
550             replayable.terminate();
551 
552             if (cumulation != null && cumulation.readable()) {
553                 // Make sure all data was read before notifying a closed channel.
554                 callDecode(ctx, e.getChannel(), cumulation, replayable, null);
555             }
556 
557             // Call decodeLast() finally.  Please note that decodeLast() is
558             // called even if there's nothing more to read from the buffer to
559             // notify a user that the connection was closed explicitly.
560             Object partiallyDecoded = decodeLast(ctx, e.getChannel(), replayable, state);
561 
562             this.cumulation = null;
563 
564             if (partiallyDecoded != null) {
565                 unfoldAndFireMessageReceived(ctx, null, partiallyDecoded);
566             }
567         } catch (ReplayError replay) {
568             // Ignore
569         } finally {
570             ctx.sendUpstream(e);
571         }
572     }
573 }