View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.handler.codec.frame;
17  
18  import org.jboss.netty.buffer.ChannelBuffer;
19  import org.jboss.netty.buffer.ChannelBufferFactory;
20  import org.jboss.netty.buffer.ChannelBuffers;
21  import org.jboss.netty.buffer.CompositeChannelBuffer;
22  import org.jboss.netty.channel.Channel;
23  import org.jboss.netty.channel.ChannelHandler;
24  import org.jboss.netty.channel.ChannelHandlerContext;
25  import org.jboss.netty.channel.ChannelPipeline;
26  import org.jboss.netty.channel.ChannelStateEvent;
27  import org.jboss.netty.channel.ChannelUpstreamHandler;
28  import org.jboss.netty.channel.Channels;
29  import org.jboss.netty.channel.ExceptionEvent;
30  import org.jboss.netty.channel.LifeCycleAwareChannelHandler;
31  import org.jboss.netty.channel.MessageEvent;
32  import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
33  import org.jboss.netty.handler.codec.replay.ReplayingDecoder;
34  
35  import java.net.SocketAddress;
36  
37  /**
38   * Decodes the received {@link ChannelBuffer}s into a meaningful frame object.
39   * <p>
40   * In a stream-based transport such as TCP/IP, packets can be fragmented and
41   * reassembled during transmission even in a LAN environment.  For example,
42   * let us assume you have received three packets:
43   * <pre>
44   * +-----+-----+-----+
45   * | ABC | DEF | GHI |
46   * +-----+-----+-----+
47   * </pre>
48   * because of the packet fragmentation, a server can receive them like the
49   * following:
50   * <pre>
51   * +----+-------+---+---+
52   * | AB | CDEFG | H | I |
53   * +----+-------+---+---+
54   * </pre>
55   * <p>
56   * {@link FrameDecoder} helps you defrag the received packets into one or more
57   * meaningful <strong>frames</strong> that could be easily understood by the
58   * application logic.  In case of the example above, your {@link FrameDecoder}
59   * implementation could defrag the received packets like the following:
60   * <pre>
61   * +-----+-----+-----+
62   * | ABC | DEF | GHI |
63   * +-----+-----+-----+
64   * </pre>
65   * <p>
66   * The following code shows an example handler which decodes a frame whose
67   * first 4 bytes header represents the length of the frame, excluding the
68   * header.
69   * <pre>
70   * MESSAGE FORMAT
71   * ==============
72   *
73   * Offset:  0        4                   (Length + 4)
74   *          +--------+------------------------+
75   * Fields:  | Length | Actual message content |
76   *          +--------+------------------------+
77   *
78   * DECODER IMPLEMENTATION
79   * ======================
80   *
81   * public class IntegerHeaderFrameDecoder extends {@link FrameDecoder} {
82   *
83   *   {@code @Override}
84   *   protected Object decode({@link ChannelHandlerContext} ctx,
85   *                           {@link Channel channel},
86   *                           {@link ChannelBuffer} buf) throws Exception {
87   *
88   *     // Make sure if the length field was received.
89   *     if (buf.readableBytes() &lt; 4) {
90   *        // The length field was not received yet - return null.
91   *        // This method will be invoked again when more packets are
92   *        // received and appended to the buffer.
93   *        return <strong>null</strong>;
94   *     }
95   *
96   *     // The length field is in the buffer.
97   *
98   *     // Mark the current buffer position before reading the length field
99   *     // because the whole frame might not be in the buffer yet.
100  *     // We will reset the buffer position to the marked position if
101  *     // there's not enough bytes in the buffer.
102  *     buf.markReaderIndex();
103  *
104  *     // Read the length field.
105  *     int length = buf.readInt();
106  *
107  *     // Make sure if there's enough bytes in the buffer.
108  *     if (buf.readableBytes() &lt; length) {
109  *        // The whole bytes were not received yet - return null.
110  *        // This method will be invoked again when more packets are
111  *        // received and appended to the buffer.
112  *
113  *        // Reset to the marked position to read the length field again
114  *        // next time.
115  *        buf.resetReaderIndex();
116  *
117  *        return <strong>null</strong>;
118  *     }
119  *
120  *     // There's enough bytes in the buffer. Read it.
121  *     {@link ChannelBuffer} frame = buf.readBytes(length);
122  *
123  *     // Successfully decoded a frame.  Return the decoded frame.
124  *     return <strong>frame</strong>;
125  *   }
126  * }
127  * </pre>
128  *
129  * <h3>Returning a POJO rather than a {@link ChannelBuffer}</h3>
130  * <p>
131  * Please note that you can return an object of a different type than
132  * {@link ChannelBuffer} in your {@code decode()} and {@code decodeLast()}
133  * implementation.  For example, you could return a
134  * <a href="http://en.wikipedia.org/wiki/POJO">POJO</a> so that the next
135  * {@link ChannelUpstreamHandler} receives a {@link MessageEvent} which
136  * contains a POJO rather than a {@link ChannelBuffer}.
137  *
138  * <h3>Replacing a decoder with another decoder in a pipeline</h3>
139  * <p>
140  * If you are going to write a protocol multiplexer, you will probably want to
141  * replace a {@link FrameDecoder} (protocol detector) with another
142  * {@link FrameDecoder} or {@link ReplayingDecoder} (actual protocol decoder).
143  * It is not possible to achieve this simply by calling
144  * {@link ChannelPipeline#replace(ChannelHandler, String, ChannelHandler)}, but
145  * some additional steps are required:
146  * <pre>
147  * public class FirstDecoder extends {@link FrameDecoder} {
148  *
149  *     public FirstDecoder() {
150  *         super(true); // Enable unfold
151  *     }
152  *
153  *     {@code @Override}
154  *     protected Object decode({@link ChannelHandlerContext} ctx,
155  *                             {@link Channel} channel,
156  *                             {@link ChannelBuffer} buf) {
157  *         ...
158  *         // Decode the first message
159  *         Object firstMessage = ...;
160  *
161  *         // Add the second decoder
162  *         ctx.getPipeline().addLast("second", new SecondDecoder());
163  *
164  *         // Remove the first decoder (me)
165  *         ctx.getPipeline().remove(this);
166  *
167  *         if (buf.readable()) {
168  *             // Hand off the remaining data to the second decoder
169  *             return new Object[] { firstMessage, buf.readBytes(buf.readableBytes()) };
170  *         } else {
171  *             // Nothing to hand off
172  *             return firstMessage;
173  *         }
174  *     }
175  * }
176  * </pre>
177  *
178  * @apiviz.landmark
179  */
180 public abstract class FrameDecoder extends SimpleChannelUpstreamHandler implements LifeCycleAwareChannelHandler {
181 
182     public static final int DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS = 1024;
183 
184     private boolean unfold;
185     protected ChannelBuffer cumulation;
186     private volatile ChannelHandlerContext ctx;
187     private int copyThreshold;
188     private int maxCumulationBufferComponents = DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS;
189 
190     protected FrameDecoder() {
191         this(false);
192     }
193 
194     protected FrameDecoder(boolean unfold) {
195         this.unfold = unfold;
196     }
197 
198     public final boolean isUnfold() {
199         return unfold;
200     }
201 
202     public final void setUnfold(boolean unfold) {
203         if (ctx == null) {
204             this.unfold = unfold;
205         } else {
206             throw new IllegalStateException(
207                     "decoder properties cannot be changed once the decoder is added to a pipeline.");
208         }
209     }
210 
211     /**
212      * See {@link #setMaxCumulationBufferCapacity(int)} for explaintation of this setting
213      *
214      */
215     public final int getMaxCumulationBufferCapacity() {
216         return copyThreshold;
217     }
218 
219     /**
220      * Set the maximal capacity of the internal cumulation ChannelBuffer to use
221      * before the {@link FrameDecoder} tries to minimize the memory usage by
222      * "byte copy".
223      *
224      *
225      * What you use here really depends on your application and need. Using
226      * {@link Integer#MAX_VALUE} will disable all byte copies but give you the
227      * cost of a higher memory usage if big {@link ChannelBuffer}'s will be
228      * received.
229      *
230      * By default a threshold of {@code 0} is used, which means it will
231      * always copy to try to reduce memory usage
232      *
233      *
234      * @param copyThreshold
235      *            the threshold (in bytes) or {@link Integer#MAX_VALUE} to
236      *            disable it. The value must be at least 0
237      * @throws IllegalStateException
238      *             get thrown if someone tries to change this setting after the
239      *             Decoder was added to the {@link ChannelPipeline}
240      */
241     public final void setMaxCumulationBufferCapacity(int copyThreshold) {
242         if (copyThreshold < 0) {
243             throw new IllegalArgumentException("maxCumulationBufferCapacity must be >= 0");
244         }
245         if (ctx == null) {
246             this.copyThreshold = copyThreshold;
247         } else {
248             throw new IllegalStateException(
249                     "decoder properties cannot be changed once the decoder is added to a pipeline.");
250         }
251     }
252 
253     /**
254      * Returns the maximum number of components in the cumulation buffer.  If the number of
255      * the components in the cumulation buffer exceeds this value, the components of the
256      * cumulation buffer are consolidated into a single component, involving memory copies.
257      * The default value of this property {@link #DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS}.
258      */
259     public final int getMaxCumulationBufferComponents() {
260         return maxCumulationBufferComponents;
261     }
262 
263     /**
264      * Sets the maximum number of components in the cumulation buffer.  If the number of
265      * the components in the cumulation buffer exceeds this value, the components of the
266      * cumulation buffer are consolidated into a single component, involving memory copies.
267      * The default value of this property is {@link #DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS}
268      * and its minimum allowed value is {@code 2}.
269      */
270     public final void setMaxCumulationBufferComponents(int maxCumulationBufferComponents) {
271         if (maxCumulationBufferComponents < 2) {
272             throw new IllegalArgumentException(
273                     "maxCumulationBufferComponents: " + maxCumulationBufferComponents +
274                     " (expected: >= 2)");
275         }
276 
277         if (ctx == null) {
278             this.maxCumulationBufferComponents = maxCumulationBufferComponents;
279         } else {
280             throw new IllegalStateException(
281                     "decoder properties cannot be changed once the decoder is added to a pipeline.");
282         }
283     }
284 
285     @Override
286     public void messageReceived(
287             ChannelHandlerContext ctx, MessageEvent e) throws Exception {
288 
289         Object m = e.getMessage();
290         if (!(m instanceof ChannelBuffer)) {
291             ctx.sendUpstream(e);
292             return;
293         }
294 
295         ChannelBuffer input = (ChannelBuffer) m;
296         if (!input.readable()) {
297             return;
298         }
299 
300         if (cumulation == null) {
301             try {
302                 // the cumulation buffer is not created yet so just pass the input to callDecode(...) method
303                 callDecode(ctx, e.getChannel(), input, e.getRemoteAddress());
304             } finally {
305                 updateCumulation(ctx, input);
306             }
307         } else {
308             input = appendToCumulation(input);
309             try {
310                 callDecode(ctx, e.getChannel(), input, e.getRemoteAddress());
311             } finally {
312                 updateCumulation(ctx, input);
313             }
314         }
315     }
316 
317     protected ChannelBuffer appendToCumulation(ChannelBuffer input) {
318         ChannelBuffer cumulation = this.cumulation;
319         assert cumulation.readable();
320         if (cumulation instanceof CompositeChannelBuffer) {
321             // Make sure the resulting cumulation buffer has no more than the configured components.
322             CompositeChannelBuffer composite = (CompositeChannelBuffer) cumulation;
323             if (composite.numComponents() >= maxCumulationBufferComponents) {
324                 cumulation = composite.copy();
325             }
326         }
327 
328         this.cumulation = input = ChannelBuffers.wrappedBuffer(cumulation, input);
329         return input;
330     }
331 
332     protected ChannelBuffer updateCumulation(ChannelHandlerContext ctx, ChannelBuffer input) {
333         ChannelBuffer newCumulation;
334         int readableBytes = input.readableBytes();
335         if (readableBytes > 0) {
336             int inputCapacity = input.capacity();
337 
338             // If input.readableBytes() == input.capacity() (i.e. input is full),
339             // there's nothing to save from creating a new cumulation buffer
340             // even if input.capacity() exceeds the threshold, because the new cumulation
341             // buffer will have the same capacity and content with input.
342             if (readableBytes < inputCapacity && inputCapacity > copyThreshold) {
343                 // At least one byte was consumed by callDecode() and input.capacity()
344                 // exceeded the threshold.
345                 cumulation = newCumulation = newCumulationBuffer(ctx, input.readableBytes());
346                 cumulation.writeBytes(input);
347             } else {
348                 // Nothing was consumed by callDecode() or input.capacity() did not
349                 // exceed the threshold.
350                 if (input.readerIndex() != 0) {
351                     cumulation = newCumulation = input.slice();
352                 } else {
353                     cumulation = newCumulation = input;
354                 }
355             }
356         } else {
357             cumulation = newCumulation = null;
358         }
359         return newCumulation;
360     }
361 
362     @Override
363     public void channelDisconnected(
364             ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
365         cleanup(ctx, e);
366     }
367 
368     @Override
369     public void channelClosed(
370             ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
371         cleanup(ctx, e);
372     }
373 
374     @Override
375     public void exceptionCaught(
376             ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
377         ctx.sendUpstream(e);
378     }
379 
380     /**
381      * Decodes the received packets so far into a frame.
382      *
383      * If an sub-class wants to extract a frame out of the buffer it should use
384      * the {@link #extractFrame(ChannelBuffer, int, int)} method,
385      * to make optimizations easier later.
386      *
387      * @param ctx      the context of this handler
388      * @param channel  the current channel
389      * @param buffer   the cumulative buffer of received packets so far.
390      *                 Note that the buffer might be empty, which means you
391      *                 should not make an assumption that the buffer contains
392      *                 at least one byte in your decoder implementation.
393      *
394      * @return the decoded frame if a full frame was received and decoded.
395      *         {@code null} if there's not enough data in the buffer to decode a frame.
396      */
397     protected abstract Object decode(
398             ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception;
399 
400     /**
401      * Decodes the received data so far into a frame when the channel is
402      * disconnected.
403      *
404      * @param ctx      the context of this handler
405      * @param channel  the current channel
406      * @param buffer   the cumulative buffer of received packets so far.
407      *                 Note that the buffer might be empty, which means you
408      *                 should not make an assumption that the buffer contains
409      *                 at least one byte in your decoder implementation.
410      *
411      * @return the decoded frame if a full frame was received and decoded.
412      *         {@code null} if there's not enough data in the buffer to decode a frame.
413      */
414     protected Object decodeLast(
415             ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
416         return decode(ctx, channel, buffer);
417     }
418 
419     private void callDecode(
420             ChannelHandlerContext context, Channel channel,
421             ChannelBuffer cumulation, SocketAddress remoteAddress) throws Exception {
422 
423         while (cumulation.readable()) {
424             int oldReaderIndex = cumulation.readerIndex();
425             Object frame = decode(context, channel, cumulation);
426             if (frame == null) {
427                 if (oldReaderIndex == cumulation.readerIndex()) {
428                     // Seems like more data is required.
429                     // Let us wait for the next notification.
430                     break;
431                 } else {
432                     // Previous data has been discarded.
433                     // Probably it is reading on.
434                     continue;
435                 }
436             }
437             if (oldReaderIndex == cumulation.readerIndex()) {
438                 throw new IllegalStateException(
439                         "decode() method must read at least one byte " +
440                         "if it returned a frame (caused by: " + getClass() + ')');
441             }
442 
443             unfoldAndFireMessageReceived(context, remoteAddress, frame);
444         }
445     }
446 
447     protected final void unfoldAndFireMessageReceived(
448             ChannelHandlerContext context, SocketAddress remoteAddress, Object result) {
449         if (unfold) {
450             if (result instanceof Object[]) {
451                 for (Object r: (Object[]) result) {
452                     Channels.fireMessageReceived(context, r, remoteAddress);
453                 }
454             } else if (result instanceof Iterable<?>) {
455                 for (Object r: (Iterable<?>) result) {
456                     Channels.fireMessageReceived(context, r, remoteAddress);
457                 }
458             } else {
459                 Channels.fireMessageReceived(context, result, remoteAddress);
460             }
461         } else {
462             Channels.fireMessageReceived(context, result, remoteAddress);
463         }
464     }
465 
466     /**
467      * Gets called on {@link #channelDisconnected(ChannelHandlerContext, ChannelStateEvent)} and
468      * {@link #channelClosed(ChannelHandlerContext, ChannelStateEvent)}
469      */
470     protected void cleanup(ChannelHandlerContext ctx, ChannelStateEvent e)
471             throws Exception {
472         try {
473             ChannelBuffer cumulation = this.cumulation;
474             if (cumulation == null) {
475                 return;
476             }
477 
478             this.cumulation = null;
479 
480             if (cumulation.readable()) {
481                 // Make sure all frames are read before notifying a closed channel.
482                 callDecode(ctx, ctx.getChannel(), cumulation, null);
483             }
484 
485             // Call decodeLast() finally.  Please note that decodeLast() is
486             // called even if there's nothing more to read from the buffer to
487             // notify a user that the connection was closed explicitly.
488             Object partialFrame = decodeLast(ctx, ctx.getChannel(), cumulation);
489             if (partialFrame != null) {
490                 unfoldAndFireMessageReceived(ctx, null, partialFrame);
491             }
492         } finally {
493             ctx.sendUpstream(e);
494         }
495     }
496 
497     /**
498      * Create a new {@link ChannelBuffer} which is used for the cumulation.
499      * Sub-classes may override this.
500      *
501      * @param ctx {@link ChannelHandlerContext} for this handler
502      * @return buffer the {@link ChannelBuffer} which is used for cumulation
503      */
504     protected ChannelBuffer newCumulationBuffer(
505             ChannelHandlerContext ctx, int minimumCapacity) {
506         ChannelBufferFactory factory = ctx.getChannel().getConfig().getBufferFactory();
507         return factory.getBuffer(Math.max(minimumCapacity, 256));
508     }
509 
510     /**
511      * Replace this {@link FrameDecoder} in the {@link ChannelPipeline} with the given {@link ChannelHandler}. All
512      * remaining bytes in the {@link ChannelBuffer} will get send to the new {@link ChannelHandler} that was used
513      * as replacement
514      *
515      */
516     public void replace(String handlerName, ChannelHandler handler) {
517         if (ctx == null) {
518             throw new IllegalStateException(
519                     "Replace cann only be called once the FrameDecoder is added to the ChannelPipeline");
520         }
521         ChannelPipeline pipeline = ctx.getPipeline();
522         pipeline.addAfter(ctx.getName(), handlerName, handler);
523 
524         try {
525             if (cumulation != null) {
526                 Channels.fireMessageReceived(ctx, cumulation.readBytes(actualReadableBytes()));
527             }
528         } finally {
529             pipeline.remove(this);
530         }
531     }
532 
533     /**
534      * Returns the actual number of readable bytes in the internal cumulative
535      * buffer of this decoder.  You usually do not need to rely on this value
536      * to write a decoder.  Use it only when you muse use it at your own risk.
537      * This method is a shortcut to {@link #internalBuffer() internalBuffer().readableBytes()}.
538      */
539     protected int actualReadableBytes() {
540         return internalBuffer().readableBytes();
541     }
542 
543     /**
544      * Returns the internal cumulative buffer of this decoder.  You usually
545      * do not need to access the internal buffer directly to write a decoder.
546      * Use it only when you must use it at your own risk.
547      */
548     protected ChannelBuffer internalBuffer() {
549         ChannelBuffer buf = cumulation;
550         if (buf == null) {
551             return ChannelBuffers.EMPTY_BUFFER;
552         }
553         return buf;
554     }
555 
556     /**
557      * Extract a Frame of the specified buffer. By default this implementation
558      * will return a extract the sub-region of the buffer and create a new one.
559      * If an sub-class want to extract a frame from the buffer it should use this method
560      * by default.
561      *
562      * <strong>Be sure that this method MUST not modify the readerIndex of the given buffer</strong>
563      *
564      */
565     protected ChannelBuffer extractFrame(ChannelBuffer buffer, int index, int length) {
566         ChannelBuffer frame = buffer.factory().getBuffer(length);
567         frame.writeBytes(buffer, index, length);
568         return frame;
569     }
570 
571     public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
572         this.ctx = ctx;
573     }
574 
575     public void afterAdd(ChannelHandlerContext ctx) throws Exception {
576         // Nothing to do..
577     }
578 
579     public void beforeRemove(ChannelHandlerContext ctx) throws Exception {
580         // Nothing to do..
581     }
582 
583     public void afterRemove(ChannelHandlerContext ctx) throws Exception {
584         // Nothing to do..
585     }
586 
587 }