View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.codec;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufAllocator;
20  import io.netty.buffer.CompositeByteBuf;
21  import io.netty.buffer.Unpooled;
22  import io.netty.channel.ChannelHandler;
23  import io.netty.channel.ChannelHandlerAdapter;
24  import io.netty.channel.ChannelHandlerContext;
25  import io.netty.util.internal.RecyclableArrayList;
26  import io.netty.util.internal.StringUtil;
27  
28  import java.util.List;
29  
30  /**
31   * A {@link ChannelHandler} which decodes bytes in a stream-like fashion from one {@link ByteBuf} to an
32   * other Message type.
33   *
34   * For example here is an implementation which reads all readable bytes from
35   * the input {@link ByteBuf} and create a new {@link ByteBuf}.
36   *
37   * <pre>
38   *     public class SquareDecoder extends {@link ByteToMessageDecoder} {
39   *         {@code @Override}
40   *         public void decode({@link ChannelHandlerContext} ctx, {@link ByteBuf} in, List&lt;Object&gt; out)
41   *                 throws {@link Exception} {
42   *             out.add(in.readBytes(in.readableBytes()));
43   *         }
44   *     }
45   * </pre>
46   *
47   * <h3>Frame detection</h3>
48   * <p>
49   * Generally frame detection should be handled earlier in the pipeline by adding a
50   * {@link DelimiterBasedFrameDecoder}, {@link FixedLengthFrameDecoder}, {@link LengthFieldBasedFrameDecoder},
51   * or {@link LineBasedFrameDecoder}.
52   * <p>
53   * If a custom frame decoder is required, then one needs to be careful when implementing
54   * one with {@link ByteToMessageDecoder}. Ensure there are enough bytes in the buffer for a
55   * complete frame by checking {@link ByteBuf#readableBytes()}. If there are not enough bytes
56   * for a complete frame, return without modifying the reader index to allow more bytes to arrive.
57   * <p>
58   * To check for complete frames without modifying the reader index, use methods like {@link ByteBuf#getInt(int)}.
59   * One <strong>MUST</strong> use the reader index when using methods like {@link ByteBuf#getInt(int)}.
60   * For example calling <tt>in.getInt(0)</tt> is assuming the frame starts at the beginning of the buffer, which
61   * is not always the case. Use <tt>in.getInt(in.readerIndex())</tt> instead.
62   * <h3>Pitfalls</h3>
63   * <p>
64   * Be aware that sub-classes of {@link ByteToMessageDecoder} <strong>MUST NOT</strong>
65   * annotated with {@link @Sharable}.
66   * <p>
67   * Some methods such as {@link ByteBuf#readBytes(int)} will cause a memory leak if the returned buffer
68   * is not released or added to the <tt>out</tt> {@link List}. Use derived buffers like {@link ByteBuf#readSlice(int)}
69   * to avoid leaking memory.
70   */
71  public abstract class ByteToMessageDecoder extends ChannelHandlerAdapter {
72  
73      /**
74       * Cumulate {@link ByteBuf}s by merge them into one {@link ByteBuf}'s, using memory copies.
75       */
76      public static final Cumulator MERGE_CUMULATOR = new Cumulator() {
77          @Override
78          public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
79              ByteBuf buffer;
80              if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes()
81                      || cumulation.refCnt() > 1) {
82                  // Expand cumulation (by replace it) when either there is not more room in the buffer
83                  // or if the refCnt is greater then 1 which may happen when the user use slice().retain() or
84                  // duplicate().retain().
85                  //
86                  // See:
87                  // - https://github.com/netty/netty/issues/2327
88                  // - https://github.com/netty/netty/issues/1764
89                  buffer = expandCumulation(alloc, cumulation, in.readableBytes());
90              } else {
91                  buffer = cumulation;
92              }
93              buffer.writeBytes(in);
94              in.release();
95              return buffer;
96          }
97      };
98  
99      /**
100      * Cumulate {@link ByteBuf}s by add them to a {@link CompositeByteBuf} and so do no memory copy whenever possible.
101      * Be aware that {@link CompositeByteBuf} use a more complex indexing implementation so depending on your use-case
102      * and the decoder implementation this may be slower then just use the {@link #MERGE_CUMULATOR}.
103      */
104     public static final Cumulator COMPOSITE_CUMULATOR = new Cumulator() {
105         @Override
106         public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
107             ByteBuf buffer;
108             if (cumulation.refCnt() > 1) {
109                 // Expand cumulation (by replace it) when the refCnt is greater then 1 which may happen when the user
110                 // use slice().retain() or duplicate().retain().
111                 //
112                 // See:
113                 // - https://github.com/netty/netty/issues/2327
114                 // - https://github.com/netty/netty/issues/1764
115                 buffer = expandCumulation(alloc, cumulation, in.readableBytes());
116                 buffer.writeBytes(in);
117                 in.release();
118             } else {
119                 CompositeByteBuf composite;
120                 if (cumulation instanceof CompositeByteBuf) {
121                     composite = (CompositeByteBuf) cumulation;
122                 } else {
123                     int readable = cumulation.readableBytes();
124                     composite = alloc.compositeBuffer();
125                     composite.addComponent(cumulation).writerIndex(readable);
126                 }
127                 composite.addComponent(in).writerIndex(composite.writerIndex() + in.readableBytes());
128                 buffer = composite;
129             }
130             return buffer;
131         }
132     };
133 
134     ByteBuf cumulation;
135     private Cumulator cumulator = MERGE_CUMULATOR;
136     private boolean singleDecode;
137     private boolean first;
138 
139     protected ByteToMessageDecoder() {
140         CodecUtil.ensureNotSharable(this);
141     }
142 
143     /**
144      * If set then only one message is decoded on each {@link #channelRead(ChannelHandlerContext, Object)}
145      * call. This may be useful if you need to do some protocol upgrade and want to make sure nothing is mixed up.
146      *
147      * Default is {@code false} as this has performance impacts.
148      */
149     public void setSingleDecode(boolean singleDecode) {
150         this.singleDecode = singleDecode;
151     }
152 
153     /**
154      * If {@code true} then only one message is decoded on each
155      * {@link #channelRead(ChannelHandlerContext, Object)} call.
156      *
157      * Default is {@code false} as this has performance impacts.
158      */
159     public boolean isSingleDecode() {
160         return singleDecode;
161     }
162 
163     /**
164      * Set the {@link Cumulator} to use for cumulate the received {@link ByteBuf}s.
165      */
166     public void setCumulator(Cumulator cumulator) {
167         if (cumulator == null) {
168             throw new NullPointerException("cumulator");
169         }
170         this.cumulator = cumulator;
171     }
172 
173     /**
174      * Returns the actual number of readable bytes in the internal cumulative
175      * buffer of this decoder. You usually do not need to rely on this value
176      * to write a decoder. Use it only when you must use it at your own risk.
177      * This method is a shortcut to {@link #internalBuffer() internalBuffer().readableBytes()}.
178      */
179     protected int actualReadableBytes() {
180         return internalBuffer().readableBytes();
181     }
182 
183     /**
184      * Returns the internal cumulative buffer of this decoder. You usually
185      * do not need to access the internal buffer directly to write a decoder.
186      * Use it only when you must use it at your own risk.
187      */
188     protected ByteBuf internalBuffer() {
189         if (cumulation != null) {
190             return cumulation;
191         } else {
192             return Unpooled.EMPTY_BUFFER;
193         }
194     }
195 
196     @Override
197     public final void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
198         ByteBuf buf = internalBuffer();
199         int readable = buf.readableBytes();
200         if (readable > 0) {
201             ByteBuf bytes = buf.readBytes(readable);
202             buf.release();
203             ctx.fireChannelRead(bytes);
204             ctx.fireChannelReadComplete();
205         } else {
206             buf.release();
207         }
208         cumulation = null;
209         handlerRemoved0(ctx);
210     }
211 
212     /**
213      * Gets called after the {@link ByteToMessageDecoder} was removed from the actual context and it doesn't handle
214      * events anymore.
215      */
216     protected void handlerRemoved0(ChannelHandlerContext ctx) throws Exception { }
217 
218     @Override
219     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
220         if (msg instanceof ByteBuf) {
221             RecyclableArrayList out = RecyclableArrayList.newInstance();
222             try {
223                 ByteBuf data = (ByteBuf) msg;
224                 first = cumulation == null;
225                 if (first) {
226                     cumulation = data;
227                 } else {
228                     cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
229                 }
230                 callDecode(ctx, cumulation, out);
231             } catch (DecoderException e) {
232                 throw e;
233             } catch (Throwable t) {
234                 throw new DecoderException(t);
235             } finally {
236                 if (cumulation != null && !cumulation.isReadable()) {
237                     cumulation.release();
238                     cumulation = null;
239                 }
240                 int size = out.size();
241 
242                 for (int i = 0; i < size; i ++) {
243                     ctx.fireChannelRead(out.get(i));
244                 }
245                 out.recycle();
246             }
247         } else {
248             ctx.fireChannelRead(msg);
249         }
250     }
251 
252     @Override
253     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
254         if (cumulation != null && !first && cumulation.refCnt() == 1) {
255             // discard some bytes if possible to make more room in the
256             // buffer but only if the refCnt == 1  as otherwise the user may have
257             // used slice().retain() or duplicate().retain().
258             //
259             // See:
260             // - https://github.com/netty/netty/issues/2327
261             // - https://github.com/netty/netty/issues/1764
262             cumulation.discardSomeReadBytes();
263         }
264         ctx.fireChannelReadComplete();
265     }
266 
267     @Override
268     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
269         RecyclableArrayList out = RecyclableArrayList.newInstance();
270         try {
271             if (cumulation != null) {
272                 callDecode(ctx, cumulation, out);
273                 decodeLast(ctx, cumulation, out);
274             } else {
275                 decodeLast(ctx, Unpooled.EMPTY_BUFFER, out);
276             }
277         } catch (DecoderException e) {
278             throw e;
279         } catch (Exception e) {
280             throw new DecoderException(e);
281         } finally {
282             try {
283                 if (cumulation != null) {
284                     cumulation.release();
285                     cumulation = null;
286                 }
287                 int size = out.size();
288                 for (int i = 0; i < size; i++) {
289                     ctx.fireChannelRead(out.get(i));
290                 }
291                 if (size > 0) {
292                     // Something was read, call fireChannelReadComplete()
293                     ctx.fireChannelReadComplete();
294                 }
295                 ctx.fireChannelInactive();
296             } finally {
297                 // recycle in all cases
298                 out.recycle();
299             }
300         }
301     }
302 
303     /**
304      * Called once data should be decoded from the given {@link ByteBuf}. This method will call
305      * {@link #decode(ChannelHandlerContext, ByteBuf, List)} as long as decoding should take place.
306      *
307      * @param ctx           the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
308      * @param in            the {@link ByteBuf} from which to read data
309      * @param out           the {@link List} to which decoded messages should be added
310      */
311     protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
312         try {
313             while (in.isReadable()) {
314                 int outSize = out.size();
315                 int oldInputLength = in.readableBytes();
316                 decode(ctx, in, out);
317 
318                 // Check if this handler was removed before continuing the loop.
319                 // If it was removed, it is not safe to continue to operate on the buffer.
320                 //
321                 // See https://github.com/netty/netty/issues/1664
322                 if (ctx.isRemoved()) {
323                     break;
324                 }
325 
326                 if (outSize == out.size()) {
327                     if (oldInputLength == in.readableBytes()) {
328                         break;
329                     } else {
330                         continue;
331                     }
332                 }
333 
334                 if (oldInputLength == in.readableBytes()) {
335                     throw new DecoderException(
336                             StringUtil.simpleClassName(getClass()) +
337                             ".decode() did not read anything but decoded a message.");
338                 }
339 
340                 if (isSingleDecode()) {
341                     break;
342                 }
343             }
344         } catch (DecoderException e) {
345             throw e;
346         } catch (Throwable cause) {
347             throw new DecoderException(cause);
348         }
349     }
350 
351     /**
352      * Decode the from one {@link ByteBuf} to an other. This method will be called till either the input
353      * {@link ByteBuf} has nothing to read when return from this method or till nothing was read from the input
354      * {@link ByteBuf}.
355      *
356      * @param ctx           the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
357      * @param in            the {@link ByteBuf} from which to read data
358      * @param out           the {@link List} to which decoded messages should be added
359      * @throws Exception    is thrown if an error accour
360      */
361     protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;
362 
363     /**
364      * Is called one last time when the {@link ChannelHandlerContext} goes in-active. Which means the
365      * {@link #channelInactive(ChannelHandlerContext)} was triggered.
366      *
367      * By default this will just call {@link #decode(ChannelHandlerContext, ByteBuf, List)} but sub-classes may
368      * override this for some special cleanup operation.
369      */
370     protected void decodeLast(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
371         decode(ctx, in, out);
372     }
373 
374     static ByteBuf expandCumulation(ByteBufAllocator alloc, ByteBuf cumulation, int readable) {
375         ByteBuf oldCumulation = cumulation;
376         cumulation = alloc.buffer(oldCumulation.readableBytes() + readable);
377         cumulation.writeBytes(oldCumulation);
378         oldCumulation.release();
379         return cumulation;
380     }
381 
382     /**
383      * Cumulate {@link ByteBuf}s.
384      */
385     public interface Cumulator {
386         /**
387          * Cumulate the given {@link ByteBuf}s and return the {@link ByteBuf} that holds the cumulated bytes.
388          * The implementation is responsible to correctly handle the life-cycle of the given {@link ByteBuf}s and so
389          * call {@link ByteBuf#release()} if a {@link ByteBuf} is fully consumed.
390          */
391         ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in);
392     }
393 }