View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.handler.codec.frame;
17  
18  import org.jboss.netty.buffer.ChannelBuffer;
19  import org.jboss.netty.buffer.ChannelBufferFactory;
20  import org.jboss.netty.buffer.ChannelBuffers;
21  import org.jboss.netty.buffer.CompositeChannelBuffer;
22  import org.jboss.netty.channel.Channel;
23  import org.jboss.netty.channel.ChannelHandler;
24  import org.jboss.netty.channel.ChannelHandlerContext;
25  import org.jboss.netty.channel.ChannelPipeline;
26  import org.jboss.netty.channel.ChannelStateEvent;
27  import org.jboss.netty.channel.ChannelUpstreamHandler;
28  import org.jboss.netty.channel.Channels;
29  import org.jboss.netty.channel.ExceptionEvent;
30  import org.jboss.netty.channel.LifeCycleAwareChannelHandler;
31  import org.jboss.netty.channel.MessageEvent;
32  import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
33  import org.jboss.netty.handler.codec.replay.ReplayingDecoder;
34  
35  import java.net.SocketAddress;
36  
37  /**
38   * Decodes the received {@link ChannelBuffer}s into a meaningful frame object.
39   * <p>
40   * In a stream-based transport such as TCP/IP, packets can be fragmented and
41   * reassembled during transmission even in a LAN environment.  For example,
42   * let us assume you have received three packets:
43   * <pre>
44   * +-----+-----+-----+
45   * | ABC | DEF | GHI |
46   * +-----+-----+-----+
47   * </pre>
48   * because of the packet fragmentation, a server can receive them like the
49   * following:
50   * <pre>
51   * +----+-------+---+---+
52   * | AB | CDEFG | H | I |
53   * +----+-------+---+---+
54   * </pre>
55   * <p>
56   * {@link FrameDecoder} helps you defrag the received packets into one or more
57   * meaningful <strong>frames</strong> that could be easily understood by the
58   * application logic.  In case of the example above, your {@link FrameDecoder}
59   * implementation could defrag the received packets like the following:
60   * <pre>
61   * +-----+-----+-----+
62   * | ABC | DEF | GHI |
63   * +-----+-----+-----+
64   * </pre>
65   * <p>
66   * The following code shows an example handler which decodes a frame whose
67   * first 4 bytes header represents the length of the frame, excluding the
68   * header.
69   * <pre>
70   * MESSAGE FORMAT
71   * ==============
72   *
73   * Offset:  0        4                   (Length + 4)
74   *          +--------+------------------------+
75   * Fields:  | Length | Actual message content |
76   *          +--------+------------------------+
77   *
78   * DECODER IMPLEMENTATION
79   * ======================
80   *
81   * public class IntegerHeaderFrameDecoder extends {@link FrameDecoder} {
82   *
83   *   {@code @Override}
84   *   protected Object decode({@link ChannelHandlerContext} ctx,
85   *                           {@link Channel channel},
86   *                           {@link ChannelBuffer} buf) throws Exception {
87   *
88   *     // Make sure if the length field was received.
89   *     if (buf.readableBytes() &lt; 4) {
90   *        // The length field was not received yet - return null.
91   *        // This method will be invoked again when more packets are
92   *        // received and appended to the buffer.
93   *        return <strong>null</strong>;
94   *     }
95   *
96   *     // The length field is in the buffer.
97   *
98   *     // Mark the current buffer position before reading the length field
99   *     // because the whole frame might not be in the buffer yet.
100  *     // We will reset the buffer position to the marked position if
101  *     // there's not enough bytes in the buffer.
102  *     buf.markReaderIndex();
103  *
104  *     // Read the length field.
105  *     int length = buf.readInt();
106  *
107  *     // Make sure if there's enough bytes in the buffer.
108  *     if (buf.readableBytes() &lt; length) {
109  *        // The whole bytes were not received yet - return null.
110  *        // This method will be invoked again when more packets are
111  *        // received and appended to the buffer.
112  *
113  *        // Reset to the marked position to read the length field again
114  *        // next time.
115  *        buf.resetReaderIndex();
116  *
117  *        return <strong>null</strong>;
118  *     }
119  *
120  *     // There's enough bytes in the buffer. Read it.
121  *     {@link ChannelBuffer} frame = buf.readBytes(length);
122  *
123  *     // Successfully decoded a frame.  Return the decoded frame.
124  *     return <strong>frame</strong>;
125  *   }
126  * }
127  * </pre>
128  *
129  * <h3>Returning a POJO rather than a {@link ChannelBuffer}</h3>
130  * <p>
131  * Please note that you can return an object of a different type than
132  * {@link ChannelBuffer} in your {@code decode()} and {@code decodeLast()}
133  * implementation.  For example, you could return a
134  * <a href="http://en.wikipedia.org/wiki/POJO">POJO</a> so that the next
135  * {@link ChannelUpstreamHandler} receives a {@link MessageEvent} which
136  * contains a POJO rather than a {@link ChannelBuffer}.
137  *
138  * <h3>Replacing a decoder with another decoder in a pipeline</h3>
139  * <p>
140  * If you are going to write a protocol multiplexer, you will probably want to
141  * replace a {@link FrameDecoder} (protocol detector) with another
142  * {@link FrameDecoder} or {@link ReplayingDecoder} (actual protocol decoder).
143  * It is not possible to achieve this simply by calling
144  * {@link ChannelPipeline#replace(ChannelHandler, String, ChannelHandler)}, but
145  * some additional steps are required:
146  * <pre>
147  * public class FirstDecoder extends {@link FrameDecoder} {
148  *
149  *     public FirstDecoder() {
150  *         super(true); // Enable unfold
151  *     }
152  *
153  *     {@code @Override}
154  *     protected Object decode({@link ChannelHandlerContext} ctx,
155  *                             {@link Channel} channel,
156  *                             {@link ChannelBuffer} buf) {
157  *         ...
158  *         // Decode the first message
159  *         Object firstMessage = ...;
160  *
161  *         // Add the second decoder
162  *         ctx.getPipeline().addLast("second", new SecondDecoder());
163  *
164  *         // Remove the first decoder (me)
165  *         ctx.getPipeline().remove(this);
166  *
167  *         if (buf.readable()) {
168  *             // Hand off the remaining data to the second decoder
169  *             return new Object[] { firstMessage, buf.readBytes(buf.readableBytes()) };
170  *         } else {
171  *             // Nothing to hand off
172  *             return firstMessage;
173  *         }
174  *     }
175  * }
176  * </pre>
177  *
178  * @apiviz.landmark
179  */
180 public abstract class FrameDecoder extends SimpleChannelUpstreamHandler implements LifeCycleAwareChannelHandler {
181 
182     public static final int DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS = 1024;
183 
184     private boolean unfold;
185     protected ChannelBuffer cumulation;
186     private volatile ChannelHandlerContext ctx;
187     private int copyThreshold;
188     private int maxCumulationBufferComponents = DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS;
189 
190     protected FrameDecoder() {
191         this(false);
192     }
193 
194     protected FrameDecoder(boolean unfold) {
195         this.unfold = unfold;
196     }
197 
198     public final boolean isUnfold() {
199         return unfold;
200     }
201 
202     public final void setUnfold(boolean unfold) {
203         if (ctx == null) {
204             this.unfold = unfold;
205         } else {
206             throw new IllegalStateException(
207                     "decoder properties cannot be changed once the decoder is added to a pipeline.");
208         }
209     }
210 
211     /**
212      * See {@link #setMaxCumulationBufferCapacity(int)} for explaintation of this setting
213      *
214      */
215     public final int getMaxCumulationBufferCapacity() {
216         return copyThreshold;
217     }
218 
219     /**
220      * Set the maximal capacity of the internal cumulation ChannelBuffer to use
221      * before the {@link FrameDecoder} tries to minimize the memory usage by
222      * "byte copy".
223      *
224      *
225      * What you use here really depends on your application and need. Using
226      * {@link Integer#MAX_VALUE} will disable all byte copies but give you the
227      * cost of a higher memory usage if big {@link ChannelBuffer}'s will be
228      * received.
229      *
230      * By default a threshold of <code>0</code> is used, which means it will
231      * always copy to try to reduce memory usage
232      *
233      *
234      * @param copyThreshold
235      *            the threshold (in bytes) or {@link Integer#MAX_VALUE} to
236      *            disable it. The value must be at least 0
237      * @throws IllegalStateException
238      *             get thrown if someone tries to change this setting after the
239      *             Decoder was added to the {@link ChannelPipeline}
240      */
241     public final void setMaxCumulationBufferCapacity(int copyThreshold) {
242         if (copyThreshold < 0) {
243             throw new IllegalArgumentException("maxCumulationBufferCapacity must be >= 0");
244         }
245         if (ctx == null) {
246             this.copyThreshold = copyThreshold;
247         } else {
248             throw new IllegalStateException(
249                     "decoder properties cannot be changed once the decoder is added to a pipeline.");
250         }
251     }
252 
253     /**
254      * Returns the maximum number of components in the cumulation buffer.  If the number of
255      * the components in the cumulation buffer exceeds this value, the components of the
256      * cumulation buffer are consolidated into a single component, involving memory copies.
257      * The default value of this property {@link #DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS}.
258      */
259     public final int getMaxCumulationBufferComponents() {
260         return maxCumulationBufferComponents;
261     }
262 
263     /**
264      * Sets the maximum number of components in the cumulation buffer.  If the number of
265      * the components in the cumulation buffer exceeds this value, the components of the
266      * cumulation buffer are consolidated into a single component, involving memory copies.
267      * The default value of this property is {@link #DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS}
268      * and its minimum allowed value is {@code 2}.
269      */
270     public final void setMaxCumulationBufferComponents(int maxCumulationBufferComponents) {
271         if (maxCumulationBufferComponents < 2) {
272             throw new IllegalArgumentException(
273                     "maxCumulationBufferComponents: " + maxCumulationBufferComponents +
274                     " (expected: >= 2)");
275         }
276 
277         if (ctx == null) {
278             this.maxCumulationBufferComponents = maxCumulationBufferComponents;
279         } else {
280             throw new IllegalStateException(
281                     "decoder properties cannot be changed once the decoder is added to a pipeline.");
282         }
283     }
284 
285     @Override
286     public void messageReceived(
287             ChannelHandlerContext ctx, MessageEvent e) throws Exception {
288 
289         Object m = e.getMessage();
290         if (!(m instanceof ChannelBuffer)) {
291             ctx.sendUpstream(e);
292             return;
293         }
294 
295         ChannelBuffer input = (ChannelBuffer) m;
296         if (!input.readable()) {
297             return;
298         }
299 
300         if (cumulation == null) {
301             try {
302                 // the cumulation buffer is not created yet so just pass the input to callDecode(...) method
303                 callDecode(ctx, e.getChannel(), input, e.getRemoteAddress());
304             } finally {
305                 updateCumulation(ctx, input);
306             }
307 
308         } else {
309             input = appendToCumulation(input);
310             try {
311                 callDecode(ctx, e.getChannel(), input, e.getRemoteAddress());
312             } finally {
313                 updateCumulation(ctx, input);
314             }
315         }
316     }
317 
318     protected ChannelBuffer appendToCumulation(ChannelBuffer input) {
319         ChannelBuffer cumulation = this.cumulation;
320         assert cumulation.readable();
321         if (cumulation instanceof CompositeChannelBuffer) {
322             // Make sure the resulting cumulation buffer has no more than the configured components.
323             CompositeChannelBuffer composite = (CompositeChannelBuffer) cumulation;
324             if (composite.numComponents() >= maxCumulationBufferComponents) {
325                 cumulation = composite.copy();
326             }
327         }
328 
329         this.cumulation = input = ChannelBuffers.wrappedBuffer(cumulation, input);
330         return input;
331     }
332 
333     protected ChannelBuffer updateCumulation(ChannelHandlerContext ctx, ChannelBuffer input) {
334         ChannelBuffer newCumulation;
335         int readableBytes = input.readableBytes();
336         if (readableBytes > 0) {
337             int inputCapacity = input.capacity();
338 
339             // If input.readableBytes() == input.capacity() (i.e. input is full),
340             // there's nothing to save from creating a new cumulation buffer
341             // even if input.capacity() exceeds the threshold, because the new cumulation
342             // buffer will have the same capacity and content with input.
343             if (readableBytes < inputCapacity && inputCapacity > copyThreshold) {
344                 // At least one byte was consumed by callDecode() and input.capacity()
345                 // exceeded the threshold.
346                 cumulation = newCumulation = newCumulationBuffer(ctx, input.readableBytes());
347                 cumulation.writeBytes(input);
348             } else {
349                 // Nothing was consumed by callDecode() or input.capacity() did not
350                 // exceed the threshold.
351                 if (input.readerIndex() != 0) {
352                     cumulation = newCumulation = input.slice();
353                 } else {
354                     cumulation = newCumulation = input;
355                 }
356             }
357         } else {
358             cumulation = newCumulation = null;
359         }
360         return newCumulation;
361     }
362 
363     @Override
364     public void channelDisconnected(
365             ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
366         cleanup(ctx, e);
367     }
368 
369     @Override
370     public void channelClosed(
371             ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
372         cleanup(ctx, e);
373     }
374 
375     @Override
376     public void exceptionCaught(
377             ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
378         ctx.sendUpstream(e);
379     }
380 
381     /**
382      * Decodes the received packets so far into a frame.
383      *
384      * @param ctx      the context of this handler
385      * @param channel  the current channel
386      * @param buffer   the cumulative buffer of received packets so far.
387      *                 Note that the buffer might be empty, which means you
388      *                 should not make an assumption that the buffer contains
389      *                 at least one byte in your decoder implementation.
390      *
391      * @return the decoded frame if a full frame was received and decoded.
392      *         {@code null} if there's not enough data in the buffer to decode a frame.
393      */
394     protected abstract Object decode(
395             ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception;
396 
397     /**
398      * Decodes the received data so far into a frame when the channel is
399      * disconnected.
400      *
401      * @param ctx      the context of this handler
402      * @param channel  the current channel
403      * @param buffer   the cumulative buffer of received packets so far.
404      *                 Note that the buffer might be empty, which means you
405      *                 should not make an assumption that the buffer contains
406      *                 at least one byte in your decoder implementation.
407      *
408      * @return the decoded frame if a full frame was received and decoded.
409      *         {@code null} if there's not enough data in the buffer to decode a frame.
410      */
411     protected Object decodeLast(
412             ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
413         return decode(ctx, channel, buffer);
414     }
415 
416     private void callDecode(
417             ChannelHandlerContext context, Channel channel,
418             ChannelBuffer cumulation, SocketAddress remoteAddress) throws Exception {
419 
420         while (cumulation.readable()) {
421             int oldReaderIndex = cumulation.readerIndex();
422             Object frame = decode(context, channel, cumulation);
423             if (frame == null) {
424                 if (oldReaderIndex == cumulation.readerIndex()) {
425                     // Seems like more data is required.
426                     // Let us wait for the next notification.
427                     break;
428                 } else {
429                     // Previous data has been discarded.
430                     // Probably it is reading on.
431                     continue;
432                 }
433             } else if (oldReaderIndex == cumulation.readerIndex()) {
434                 throw new IllegalStateException(
435                         "decode() method must read at least one byte " +
436                         "if it returned a frame (caused by: " + getClass() + ')');
437             }
438 
439             unfoldAndFireMessageReceived(context, remoteAddress, frame);
440         }
441     }
442 
443     protected final void unfoldAndFireMessageReceived(
444             ChannelHandlerContext context, SocketAddress remoteAddress, Object result) {
445         if (unfold) {
446             if (result instanceof Object[]) {
447                 for (Object r: (Object[]) result) {
448                     Channels.fireMessageReceived(context, r, remoteAddress);
449                 }
450             } else if (result instanceof Iterable<?>) {
451                 for (Object r: (Iterable<?>) result) {
452                     Channels.fireMessageReceived(context, r, remoteAddress);
453                 }
454             } else {
455                 Channels.fireMessageReceived(context, result, remoteAddress);
456             }
457         } else {
458             Channels.fireMessageReceived(context, result, remoteAddress);
459         }
460     }
461 
462     /**
463      * Gets called on {@link #channelDisconnected(ChannelHandlerContext, ChannelStateEvent)} and
464      * {@link #channelClosed(ChannelHandlerContext, ChannelStateEvent)}
465      */
466     protected void cleanup(ChannelHandlerContext ctx, ChannelStateEvent e)
467             throws Exception {
468         try {
469             ChannelBuffer cumulation = this.cumulation;
470             if (cumulation == null) {
471                 return;
472             }
473 
474             this.cumulation = null;
475 
476             if (cumulation.readable()) {
477                 // Make sure all frames are read before notifying a closed channel.
478                 callDecode(ctx, ctx.getChannel(), cumulation, null);
479             }
480 
481             // Call decodeLast() finally.  Please note that decodeLast() is
482             // called even if there's nothing more to read from the buffer to
483             // notify a user that the connection was closed explicitly.
484             Object partialFrame = decodeLast(ctx, ctx.getChannel(), cumulation);
485             if (partialFrame != null) {
486                 unfoldAndFireMessageReceived(ctx, null, partialFrame);
487             }
488         } finally {
489             ctx.sendUpstream(e);
490         }
491     }
492 
493     /**
494      * Create a new {@link ChannelBuffer} which is used for the cumulation.
495      * Sub-classes may override this.
496      *
497      * @param ctx {@link ChannelHandlerContext} for this handler
498      * @return buffer the {@link ChannelBuffer} which is used for cumulation
499      */
500     protected ChannelBuffer newCumulationBuffer(
501             ChannelHandlerContext ctx, int minimumCapacity) {
502         ChannelBufferFactory factory = ctx.getChannel().getConfig().getBufferFactory();
503         return factory.getBuffer(Math.max(minimumCapacity, 256));
504     }
505 
506     /**
507      * Replace this {@link FrameDecoder} in the {@link ChannelPipeline} with the given {@link ChannelHandler}. All
508      * remaining bytes in the {@link ChannelBuffer} will get send to the new {@link ChannelHandler} that was used
509      * as replacement
510      *
511      */
512     public void replace(String handlerName, ChannelHandler handler) {
513         if (ctx == null) {
514             throw new IllegalStateException(
515                     "Replace cann only be called once the FrameDecoder is added to the ChannelPipeline");
516         }
517         ChannelPipeline pipeline = ctx.getPipeline();
518         pipeline.addAfter(ctx.getName(), handlerName, handler);
519 
520         try {
521             if (cumulation != null) {
522                 Channels.fireMessageReceived(ctx, cumulation.readBytes(actualReadableBytes()));
523             }
524         } finally {
525             pipeline.remove(this);
526         }
527 
528     }
529 
530     /**
531      * Returns the actual number of readable bytes in the internal cumulative
532      * buffer of this decoder.  You usually do not need to rely on this value
533      * to write a decoder.  Use it only when you muse use it at your own risk.
534      * This method is a shortcut to {@link #internalBuffer() internalBuffer().readableBytes()}.
535      */
536     protected int actualReadableBytes() {
537         return internalBuffer().readableBytes();
538     }
539 
540 
541 
542     /**
543      * Returns the internal cumulative buffer of this decoder.  You usually
544      * do not need to access the internal buffer directly to write a decoder.
545      * Use it only when you must use it at your own risk.
546      */
547     protected ChannelBuffer internalBuffer() {
548         ChannelBuffer buf = cumulation;
549         if (buf == null) {
550             return ChannelBuffers.EMPTY_BUFFER;
551         }
552         return buf;
553     }
554 
555     public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
556         this.ctx = ctx;
557     }
558 
559     public void afterAdd(ChannelHandlerContext ctx) throws Exception {
560         // Nothing to do..
561     }
562 
563     public void beforeRemove(ChannelHandlerContext ctx) throws Exception {
564         // Nothing to do..
565     }
566 
567     public void afterRemove(ChannelHandlerContext ctx) throws Exception {
568         // Nothing to do..
569     }
570 
571 }