View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.handler.codec.replay;
17  
18  import java.net.SocketAddress;
19  
20  import org.jboss.netty.buffer.ChannelBuffer;
21  import org.jboss.netty.channel.Channel;
22  import org.jboss.netty.channel.ChannelHandler;
23  import org.jboss.netty.channel.ChannelHandlerContext;
24  import org.jboss.netty.channel.ChannelPipeline;
25  import org.jboss.netty.channel.ChannelStateEvent;
26  import org.jboss.netty.channel.MessageEvent;
27  import org.jboss.netty.handler.codec.frame.FrameDecoder;
28  
29  /**
30   * A specialized variation of {@link FrameDecoder} which enables implementation
31   * of a non-blocking decoder in the blocking I/O paradigm.
32   * <p>
33   * The biggest difference between {@link ReplayingDecoder} and
34   * {@link FrameDecoder} is that {@link ReplayingDecoder} allows you to
35   * implement the {@code decode()} and {@code decodeLast()} methods just like
36   * all required bytes were received already, rather than checking the
37   * availability of the required bytes.  For example, the following
38   * {@link FrameDecoder} implementation:
39   * <pre>
40   * public class IntegerHeaderFrameDecoder extends {@link FrameDecoder} {
41   *
42   *   {@code @Override}
43   *   protected Object decode({@link ChannelHandlerContext} ctx,
44   *                           {@link Channel} channel,
45   *                           {@link ChannelBuffer} buf) throws Exception {
46   *
47   *     if (buf.readableBytes() &lt; 4) {
48   *        return <strong>null</strong>;
49   *     }
50   *
51   *     buf.markReaderIndex();
52   *     int length = buf.readInt();
53   *
54   *     if (buf.readableBytes() &lt; length) {
55   *        buf.resetReaderIndex();
56   *        return <strong>null</strong>;
57   *     }
58   *
59   *     return buf.readBytes(length);
60   *   }
61   * }
62   * </pre>
63   * is simplified like the following with {@link ReplayingDecoder}:
64   * <pre>
65   * public class IntegerHeaderFrameDecoder
66   *      extends {@link ReplayingDecoder}&lt;{@link VoidEnum}&gt; {
67   *
68   *   protected Object decode({@link ChannelHandlerContext} ctx,
69   *                           {@link Channel} channel,
70   *                           {@link ChannelBuffer} buf,
71   *                           {@link VoidEnum} state) throws Exception {
72   *
73   *     return buf.readBytes(buf.readInt());
74   *   }
75   * }
76   * </pre>
77   *
78   * <h3>How does this work?</h3>
79   * <p>
80   * {@link ReplayingDecoder} passes a specialized {@link ChannelBuffer}
81   * implementation which throws an {@link Error} of certain type when there's not
82   * enough data in the buffer.  In the {@code IntegerHeaderFrameDecoder} above,
83   * you just assumed that there will be 4 or more bytes in the buffer when
84   * you call {@code buf.readInt()}.  If there's really 4 bytes in the buffer,
85   * it will return the integer header as you expected.  Otherwise, the
86   * {@link Error} will be raised and the control will be returned to
87   * {@link ReplayingDecoder}.  If {@link ReplayingDecoder} catches the
88   * {@link Error}, then it will rewind the {@code readerIndex} of the buffer
89   * back to the 'initial' position (i.e. the beginning of the buffer) and call
90   * the {@code decode(..)} method again when more data is received into the
91   * buffer.
92   * <p>
93   * Please note that {@link ReplayingDecoder} always throws the same cached
94   * {@link Error} instance to avoid the overhead of creating a new {@link Error}
95   * and filling its stack trace for every throw.
96   *
97   * <h3>Limitations</h3>
98   * <p>
99   * At the cost of the simplicity, {@link ReplayingDecoder} enforces you a few
100  * limitations:
101  * <ul>
102  * <li>Some buffer operations are prohibited.</li>
103  * <li>Performance can be worse if the network is slow and the message
104  *     format is complicated unlike the example above.  In this case, your
105  *     decoder might have to decode the same part of the message over and over
106  *     again.</li>
107  * <li>You must keep in mind that {@code decode(..)} method can be called many
108  *     times to decode a single message.  For example, the following code will
109  *     not work:
110  * <pre> public class MyDecoder extends {@link ReplayingDecoder}&lt;{@link VoidEnum}&gt; {
111  *
112  *   private final Queue&lt;Integer&gt; values = new LinkedList&lt;Integer&gt;();
113  *
114  *   {@code @Override}
115  *   public Object decode(.., {@link ChannelBuffer} buffer, ..) throws Exception {
116  *
117  *     // A message contains 2 integers.
118  *     values.offer(buffer.readInt());
119  *     values.offer(buffer.readInt());
120  *
121  *     // This assertion will fail intermittently since values.offer()
122  *     // can be called more than two times!
123  *     assert values.size() == 2;
124  *     return values.poll() + values.poll();
125  *   }
126  * }</pre>
127  *      The correct implementation looks like the following, and you can also
128  *      utilize the 'checkpoint' feature which is explained in detail in the
129  *      next section.
130  * <pre> public class MyDecoder extends {@link ReplayingDecoder}&lt;{@link VoidEnum}&gt; {
131  *
132  *   private final Queue&lt;Integer&gt; values = new LinkedList&lt;Integer&gt;();
133  *
134  *   {@code @Override}
135  *   public Object decode(.., {@link ChannelBuffer} buffer, ..) throws Exception {
136  *
137  *     // Revert the state of the variable that might have been changed
138  *     // since the last partial decode.
139  *     values.clear();
140  *
141  *     // A message contains 2 integers.
142  *     values.offer(buffer.readInt());
143  *     values.offer(buffer.readInt());
144  *
145  *     // Now we know this assertion will never fail.
146  *     assert values.size() == 2;
147  *     return values.poll() + values.poll();
148  *   }
149  * }</pre>
150  *     </li>
151  * </ul>
152  *
153  * <h3>Improving the performance</h3>
154  * <p>
155  * Fortunately, the performance of a complex decoder implementation can be
156  * improved significantly with the {@code checkpoint()} method.  The
157  * {@code checkpoint()} method updates the 'initial' position of the buffer so
158  * that {@link ReplayingDecoder} rewinds the {@code readerIndex} of the buffer
159  * to the last position where you called the {@code checkpoint()} method.
160  *
161  * <h4>Calling {@code checkpoint(T)} with an {@link Enum}</h4>
162  * <p>
163  * Although you can just use {@code checkpoint()} method and manage the state
164  * of the decoder by yourself, the easiest way to manage the state of the
165  * decoder is to create an {@link Enum} type which represents the current state
166  * of the decoder and to call {@code checkpoint(T)} method whenever the state
167  * changes.  You can have as many states as you want depending on the
168  * complexity of the message you want to decode:
169  *
170  * <pre>
171  * public enum MyDecoderState {
172  *   READ_LENGTH,
173  *   READ_CONTENT;
174  * }
175  *
176  * public class IntegerHeaderFrameDecoder
177  *      extends {@link ReplayingDecoder}&lt;<strong>MyDecoderState</strong>&gt; {
178  *
179  *   private int length;
180  *
181  *   public IntegerHeaderFrameDecoder() {
182  *     // Set the initial state.
183  *     <strong>super(MyDecoderState.READ_LENGTH);</strong>
184  *   }
185  *
186  *   {@code @Override}
187  *   protected Object decode({@link ChannelHandlerContext} ctx,
188  *                           {@link Channel} channel,
189  *                           {@link ChannelBuffer} buf,
190  *                           <b>MyDecoderState</b> state) throws Exception {
191  *     switch (state) {
192  *     case READ_LENGTH:
193  *       length = buf.readInt();
194  *       <strong>checkpoint(MyDecoderState.READ_CONTENT);</strong>
195  *     case READ_CONTENT:
196  *       ChannelBuffer frame = buf.readBytes(length);
197  *       <strong>checkpoint(MyDecoderState.READ_LENGTH);</strong>
198  *       return frame;
199  *     default:
200  *       throw new Error("Shouldn't reach here.");
201  *     }
202  *   }
203  * }
204  * </pre>
205  *
206  * <h4>Calling {@code checkpoint()} with no parameter</h4>
207  * <p>
208  * An alternative way to manage the decoder state is to manage it by yourself.
209  * <pre>
210  * public class IntegerHeaderFrameDecoder
211  *      extends {@link ReplayingDecoder}&lt;<strong>{@link VoidEnum}</strong>&gt; {
212  *
213  *   <strong>private boolean readLength;</strong>
214  *   private int length;
215  *
216  *   {@code @Override}
217  *   protected Object decode({@link ChannelHandlerContext} ctx,
218  *                           {@link Channel} channel,
219  *                           {@link ChannelBuffer} buf,
220  *                           {@link VoidEnum} state) throws Exception {
221  *     if (!readLength) {
222  *       length = buf.readInt();
223  *       <strong>readLength = true;</strong>
224  *       <strong>checkpoint();</strong>
225  *     }
226  *
227  *     if (readLength) {
228  *       ChannelBuffer frame = buf.readBytes(length);
229  *       <strong>readLength = false;</strong>
230  *       <strong>checkpoint();</strong>
231  *       return frame;
232  *     }
233  *   }
234  * }
235  * </pre>
236  *
237  * <h3>Replacing a decoder with another decoder in a pipeline</h3>
238  * <p>
239  * If you are going to write a protocol multiplexer, you will probably want to
240  * replace a {@link ReplayingDecoder} (protocol detector) with another
241  * {@link ReplayingDecoder} or {@link FrameDecoder} (actual protocol decoder).
242  * It is not possible to achieve this simply by calling
243  * {@link ChannelPipeline#replace(ChannelHandler, String, ChannelHandler)}, but
244  * some additional steps are required:
245  * <pre>
246  * public class FirstDecoder extends {@link ReplayingDecoder}&lt;{@link VoidEnum}&gt; {
247  *
248  *     public FirstDecoder() {
249  *         super(true); // Enable unfold
250  *     }
251  *
252  *     {@code @Override}
253  *     protected Object decode({@link ChannelHandlerContext} ctx,
254  *                             {@link Channel} ch,
255  *                             {@link ChannelBuffer} buf,
256  *                             {@link VoidEnum} state) {
257  *         ...
258  *         // Decode the first message
259  *         Object firstMessage = ...;
260  *
261  *         // Add the second decoder
262  *         ctx.getPipeline().addLast("second", new SecondDecoder());
263  *
264  *         // Remove the first decoder (me)
265  *         ctx.getPipeline().remove(this);
266  *
267  *         if (buf.readable()) {
268  *             // Hand off the remaining data to the second decoder
269  *             return new Object[] { firstMessage, buf.readBytes(<b>super.actualReadableBytes()</b>) };
270  *         } else {
271  *             // Nothing to hand off
272  *             return firstMessage;
273  *         }
274  *     }
275  * </pre>
276  *
277  * @param <T>
278  *        the state type; use {@link VoidEnum} if state management is unused
279  *
280  * @apiviz.landmark
281  * @apiviz.has org.jboss.netty.handler.codec.replay.UnreplayableOperationException oneway - - throws
282  */
283 public abstract class ReplayingDecoder<T extends Enum<T>>
284         extends FrameDecoder {
285 
286 
287     private final ReplayingDecoderBuffer replayable = new ReplayingDecoderBuffer(this);
288     private T state;
289     private int checkpoint;
290     private boolean needsCleanup;
291 
292 
293     /**
294      * Creates a new instance with no initial state (i.e: {@code null}).
295      */
296     protected ReplayingDecoder() {
297         this(null);
298     }
299 
300     protected ReplayingDecoder(boolean unfold) {
301         this(null, unfold);
302     }
303 
304     /**
305      * Creates a new instance with the specified initial state.
306      */
307     protected ReplayingDecoder(T initialState) {
308         this(initialState, false);
309     }
310 
311     protected ReplayingDecoder(T initialState, boolean unfold) {
312         super(unfold);
313         state = initialState;
314     }
315 
316     @Override
317     protected ChannelBuffer internalBuffer() {
318         return super.internalBuffer();
319     }
320 
321     /**
322      * Stores the internal cumulative buffer's reader position.
323      */
324     protected void checkpoint() {
325         ChannelBuffer cumulation = this.cumulation;
326         if (cumulation != null) {
327             checkpoint = cumulation.readerIndex();
328         } else {
329             checkpoint = -1; // buffer not available (already cleaned up)
330         }
331     }
332 
333     /**
334      * Stores the internal cumulative buffer's reader position and updates
335      * the current decoder state.
336      */
337     protected void checkpoint(T state) {
338         checkpoint();
339         setState(state);
340     }
341 
342     /**
343      * Returns the current state of this decoder.
344      * @return the current state of this decoder
345      */
346     protected T getState() {
347         return state;
348     }
349 
350     /**
351      * Sets the current state of this decoder.
352      * @return the old state of this decoder
353      */
354     protected T setState(T newState) {
355         T oldState = state;
356         state = newState;
357         return oldState;
358     }
359 
360     /**
361      * Decodes the received packets so far into a frame.
362      *
363      * @param ctx      the context of this handler
364      * @param channel  the current channel
365      * @param buffer   the cumulative buffer of received packets so far.
366      *                 Note that the buffer might be empty, which means you
367      *                 should not make an assumption that the buffer contains
368      *                 at least one byte in your decoder implementation.
369      * @param state    the current decoder state ({@code null} if unused)
370      *
371      * @return the decoded frame
372      */
373     protected abstract Object decode(ChannelHandlerContext ctx,
374             Channel channel, ChannelBuffer buffer, T state) throws Exception;
375 
376     /**
377      * Decodes the received data so far into a frame when the channel is
378      * disconnected.
379      *
380      * @param ctx      the context of this handler
381      * @param channel  the current channel
382      * @param buffer   the cumulative buffer of received packets so far.
383      *                 Note that the buffer might be empty, which means you
384      *                 should not make an assumption that the buffer contains
385      *                 at least one byte in your decoder implementation.
386      * @param state    the current decoder state ({@code null} if unused)
387      *
388      * @return the decoded frame
389      */
390     protected Object decodeLast(
391             ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer, T state) throws Exception {
392         return decode(ctx, channel, buffer, state);
393     }
394 
395     /**
396      * Calls {@link #decode(ChannelHandlerContext, Channel, ChannelBuffer, Enum)}. This method
397      * should be never used by {@link ReplayingDecoder} itself.  But to be safe we should handle it
398      * anyway
399      */
400     @Override
401     protected final Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
402         return decode(ctx, channel, buffer, state);
403     }
404 
405     @Override
406     protected final Object decodeLast(
407             ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
408         return decodeLast(ctx, channel, buffer, state);
409     }
410 
411     @Override
412     public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
413             throws Exception {
414 
415         Object m = e.getMessage();
416         if (!(m instanceof ChannelBuffer)) {
417             ctx.sendUpstream(e);
418             return;
419         }
420 
421         ChannelBuffer input = (ChannelBuffer) m;
422         if (!input.readable()) {
423             return;
424         }
425 
426         needsCleanup = true;
427 
428         if (cumulation == null) {
429             // the cumulation buffer is not created yet so just pass the input
430             // to callDecode(...) method
431             cumulation = input;
432 
433             int oldReaderIndex = input.readerIndex();
434             int inputSize = input.readableBytes();
435 
436             try {
437                 callDecode(
438                         ctx, e.getChannel(),
439                         input, replayable,
440                         e.getRemoteAddress());
441             } finally {
442                 int readableBytes = input.readableBytes();
443                 if (readableBytes > 0) {
444                     int inputCapacity = input.capacity();
445                     // check if readableBytes == capacity we can safe the copy as we will not be able to
446                     // optimize memory usage anyway
447                     boolean copy =
448                             readableBytes != inputCapacity &&
449                             inputCapacity > getMaxCumulationBufferCapacity();
450 
451                     // seems like there is something readable left in the input buffer
452                     // or decoder wants a replay - create the cumulation buffer and
453                     // copy the input into it
454                     ChannelBuffer cumulation;
455                     if (checkpoint > 0) {
456                         int bytesToPreserve = inputSize - (checkpoint - oldReaderIndex);
457                         if (copy) {
458                             this.cumulation = cumulation = newCumulationBuffer(ctx, bytesToPreserve);
459                             cumulation.writeBytes(input, checkpoint, bytesToPreserve);
460                         } else {
461                             this.cumulation = cumulation = input.slice(checkpoint, bytesToPreserve);
462                         }
463                     } else if (checkpoint == 0) {
464                         if (copy) {
465                             this.cumulation = cumulation = newCumulationBuffer(ctx, inputSize);
466                             cumulation.writeBytes(input, oldReaderIndex, inputSize);
467                             cumulation.readerIndex(input.readerIndex());
468                         } else {
469                             this.cumulation = cumulation = input.slice(oldReaderIndex, inputSize);
470                             cumulation.readerIndex(input.readerIndex());
471                         }
472                     } else {
473                         if (copy) {
474                             this.cumulation = cumulation = newCumulationBuffer(ctx, input.readableBytes());
475                             cumulation.writeBytes(input);
476                         } else {
477                             this.cumulation = cumulation = input;
478                         }
479                     }
480                 } else {
481                     cumulation = null;
482                 }
483             }
484         } else {
485             input = appendToCumulation(input);
486             try {
487                 callDecode(ctx, e.getChannel(), input, replayable, e.getRemoteAddress());
488             } finally {
489                 updateCumulation(ctx, input);
490             }
491         }
492     }
493 
494     private void callDecode(
495             ChannelHandlerContext context, Channel channel,
496             ChannelBuffer input, ChannelBuffer replayableInput, SocketAddress remoteAddress) throws Exception {
497         while (input.readable()) {
498             int oldReaderIndex = checkpoint = input.readerIndex();
499             Object result = null;
500             T oldState = state;
501             try {
502                 result = decode(context, channel, replayableInput, state);
503                 if (result == null) {
504                     if (oldReaderIndex == input.readerIndex() && oldState == state) {
505                         throw new IllegalStateException(
506                                 "null cannot be returned if no data is consumed and state didn't change.");
507                     } else {
508                         // Previous data has been discarded or caused state transition.
509                         // Probably it is reading on.
510                         continue;
511                     }
512                 }
513             } catch (ReplayError replay) {
514                 // Return to the checkpoint (or oldPosition) and retry.
515                 int checkpoint = this.checkpoint;
516                 if (checkpoint >= 0) {
517                     input.readerIndex(checkpoint);
518                 } else {
519                     // Called by cleanup() - no need to maintain the readerIndex
520                     // anymore because the buffer has been released already.
521                 }
522             }
523 
524             if (result == null) {
525                 // Seems like more data is required.
526                 // Let us wait for the next notification.
527                 break;
528             }
529 
530             if (oldReaderIndex == input.readerIndex() && oldState == state) {
531                 throw new IllegalStateException(
532                         "decode() method must consume at least one byte " +
533                         "if it returned a decoded message (caused by: " +
534                         getClass() + ')');
535             }
536 
537             // A successful decode
538             unfoldAndFireMessageReceived(context, remoteAddress, result);
539         }
540     }
541 
542     @Override
543     protected void cleanup(ChannelHandlerContext ctx, ChannelStateEvent e)
544             throws Exception {
545         try {
546             ChannelBuffer cumulation = this.cumulation;
547             if (!needsCleanup) {
548                 return;
549             } else {
550                 needsCleanup = false;
551             }
552 
553             replayable.terminate();
554 
555             if (cumulation != null && cumulation.readable()) {
556                 // Make sure all data was read before notifying a closed channel.
557                 callDecode(ctx, e.getChannel(), cumulation, replayable, null);
558             }
559 
560             // Call decodeLast() finally.  Please note that decodeLast() is
561             // called even if there's nothing more to read from the buffer to
562             // notify a user that the connection was closed explicitly.
563             Object partiallyDecoded = decodeLast(ctx, e.getChannel(), replayable, state);
564 
565             this.cumulation = null;
566 
567             if (partiallyDecoded != null) {
568                 unfoldAndFireMessageReceived(ctx, null, partiallyDecoded);
569             }
570         } catch (ReplayError replay) {
571             // Ignore
572         } finally {
573             ctx.sendUpstream(e);
574         }
575     }
576 }