View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.codec;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufAllocator;
20  import io.netty.buffer.CompositeByteBuf;
21  import io.netty.buffer.Unpooled;
22  import io.netty.channel.ChannelHandlerContext;
23  import io.netty.channel.ChannelInboundHandlerAdapter;
24  import io.netty.channel.socket.ChannelInputShutdownEvent;
25  import io.netty.util.internal.StringUtil;
26  
27  import java.util.List;
28  
29  /**
30   * {@link ChannelInboundHandlerAdapter} which decodes bytes in a stream-like fashion from one {@link ByteBuf} to an
31   * other Message type.
32   *
33   * For example here is an implementation which reads all readable bytes from
34   * the input {@link ByteBuf} and create a new {@link ByteBuf}.
35   *
36   * <pre>
37   *     public class SquareDecoder extends {@link ByteToMessageDecoder} {
38   *         {@code @Override}
39   *         public void decode({@link ChannelHandlerContext} ctx, {@link ByteBuf} in, List&lt;Object&gt; out)
40   *                 throws {@link Exception} {
41   *             out.add(in.readBytes(in.readableBytes()));
42   *         }
43   *     }
44   * </pre>
45   *
46   * <h3>Frame detection</h3>
47   * <p>
48   * Generally frame detection should be handled earlier in the pipeline by adding a
49   * {@link DelimiterBasedFrameDecoder}, {@link FixedLengthFrameDecoder}, {@link LengthFieldBasedFrameDecoder},
50   * or {@link LineBasedFrameDecoder}.
51   * <p>
52   * If a custom frame decoder is required, then one needs to be careful when implementing
53   * one with {@link ByteToMessageDecoder}. Ensure there are enough bytes in the buffer for a
54   * complete frame by checking {@link ByteBuf#readableBytes()}. If there are not enough bytes
55   * for a complete frame, return without modifying the reader index to allow more bytes to arrive.
56   * <p>
57   * To check for complete frames without modifying the reader index, use methods like {@link ByteBuf#getInt(int)}.
58   * One <strong>MUST</strong> use the reader index when using methods like {@link ByteBuf#getInt(int)}.
59   * For example calling <tt>in.getInt(0)</tt> is assuming the frame starts at the beginning of the buffer, which
60   * is not always the case. Use <tt>in.getInt(in.readerIndex())</tt> instead.
61   * <h3>Pitfalls</h3>
62   * <p>
63   * Be aware that sub-classes of {@link ByteToMessageDecoder} <strong>MUST NOT</strong>
64   * annotated with {@link @Sharable}.
65   * <p>
66   * Some methods such as {@link ByteBuf#readBytes(int)} will cause a memory leak if the returned buffer
67   * is not released or added to the <tt>out</tt> {@link List}. Use derived buffers like {@link ByteBuf#readSlice(int)}
68   * to avoid leaking memory.
69   */
70  public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter {
71  
72      /**
73       * Cumulate {@link ByteBuf}s by merge them into one {@link ByteBuf}'s, using memory copies.
74       */
75      public static final Cumulator MERGE_CUMULATOR = new Cumulator() {
76          @Override
77          public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
78              final ByteBuf buffer;
79              if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes()
80                      || cumulation.refCnt() > 1 || cumulation.isReadOnly()) {
81                  // Expand cumulation (by replace it) when either there is not more room in the buffer
82                  // or if the refCnt is greater then 1 which may happen when the user use slice().retain() or
83                  // duplicate().retain() or if its read-only.
84                  //
85                  // See:
86                  // - https://github.com/netty/netty/issues/2327
87                  // - https://github.com/netty/netty/issues/1764
88                  buffer = expandCumulation(alloc, cumulation, in.readableBytes());
89              } else {
90                  buffer = cumulation;
91              }
92              buffer.writeBytes(in);
93              in.release();
94              return buffer;
95          }
96      };
97  
98      /**
99       * Cumulate {@link ByteBuf}s by add them to a {@link CompositeByteBuf} and so do no memory copy whenever possible.
100      * Be aware that {@link CompositeByteBuf} use a more complex indexing implementation so depending on your use-case
101      * and the decoder implementation this may be slower then just use the {@link #MERGE_CUMULATOR}.
102      */
103     public static final Cumulator COMPOSITE_CUMULATOR = new Cumulator() {
104         @Override
105         public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
106             ByteBuf buffer;
107             if (cumulation.refCnt() > 1) {
108                 // Expand cumulation (by replace it) when the refCnt is greater then 1 which may happen when the user
109                 // use slice().retain() or duplicate().retain().
110                 //
111                 // See:
112                 // - https://github.com/netty/netty/issues/2327
113                 // - https://github.com/netty/netty/issues/1764
114                 buffer = expandCumulation(alloc, cumulation, in.readableBytes());
115                 buffer.writeBytes(in);
116                 in.release();
117             } else {
118                 CompositeByteBuf composite;
119                 if (cumulation instanceof CompositeByteBuf) {
120                     composite = (CompositeByteBuf) cumulation;
121                 } else {
122                     composite = alloc.compositeBuffer(Integer.MAX_VALUE);
123                     composite.addComponent(true, cumulation);
124                 }
125                 composite.addComponent(true, in);
126                 buffer = composite;
127             }
128             return buffer;
129         }
130     };
131 
132     private static final byte STATE_INIT = 0;
133     private static final byte STATE_CALLING_CHILD_DECODE = 1;
134     private static final byte STATE_HANDLER_REMOVED_PENDING = 2;
135 
136     ByteBuf cumulation;
137     private Cumulator cumulator = MERGE_CUMULATOR;
138     private boolean singleDecode;
139     private boolean decodeWasNull;
140     private boolean first;
141     /**
142      * A bitmask where the bits are defined as
143      * <ul>
144      *     <li>{@link #STATE_INIT}</li>
145      *     <li>{@link #STATE_CALLING_CHILD_DECODE}</li>
146      *     <li>{@link #STATE_HANDLER_REMOVED_PENDING}</li>
147      * </ul>
148      */
149     private byte decodeState = STATE_INIT;
150     private int discardAfterReads = 16;
151     private int numReads;
152 
153     protected ByteToMessageDecoder() {
154         ensureNotSharable();
155     }
156 
157     /**
158      * If set then only one message is decoded on each {@link #channelRead(ChannelHandlerContext, Object)}
159      * call. This may be useful if you need to do some protocol upgrade and want to make sure nothing is mixed up.
160      *
161      * Default is {@code false} as this has performance impacts.
162      */
163     public void setSingleDecode(boolean singleDecode) {
164         this.singleDecode = singleDecode;
165     }
166 
167     /**
168      * If {@code true} then only one message is decoded on each
169      * {@link #channelRead(ChannelHandlerContext, Object)} call.
170      *
171      * Default is {@code false} as this has performance impacts.
172      */
173     public boolean isSingleDecode() {
174         return singleDecode;
175     }
176 
177     /**
178      * Set the {@link Cumulator} to use for cumulate the received {@link ByteBuf}s.
179      */
180     public void setCumulator(Cumulator cumulator) {
181         if (cumulator == null) {
182             throw new NullPointerException("cumulator");
183         }
184         this.cumulator = cumulator;
185     }
186 
187     /**
188      * Set the number of reads after which {@link ByteBuf#discardSomeReadBytes()} are called and so free up memory.
189      * The default is {@code 16}.
190      */
191     public void setDiscardAfterReads(int discardAfterReads) {
192         if (discardAfterReads <= 0) {
193             throw new IllegalArgumentException("discardAfterReads must be > 0");
194         }
195         this.discardAfterReads = discardAfterReads;
196     }
197 
198     /**
199      * Returns the actual number of readable bytes in the internal cumulative
200      * buffer of this decoder. You usually do not need to rely on this value
201      * to write a decoder. Use it only when you must use it at your own risk.
202      * This method is a shortcut to {@link #internalBuffer() internalBuffer().readableBytes()}.
203      */
204     protected int actualReadableBytes() {
205         return internalBuffer().readableBytes();
206     }
207 
208     /**
209      * Returns the internal cumulative buffer of this decoder. You usually
210      * do not need to access the internal buffer directly to write a decoder.
211      * Use it only when you must use it at your own risk.
212      */
213     protected ByteBuf internalBuffer() {
214         if (cumulation != null) {
215             return cumulation;
216         } else {
217             return Unpooled.EMPTY_BUFFER;
218         }
219     }
220 
221     @Override
222     public final void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
223         if (decodeState == STATE_CALLING_CHILD_DECODE) {
224             decodeState = STATE_HANDLER_REMOVED_PENDING;
225             return;
226         }
227         ByteBuf buf = cumulation;
228         if (buf != null) {
229             // Directly set this to null so we are sure we not access it in any other method here anymore.
230             cumulation = null;
231 
232             int readable = buf.readableBytes();
233             if (readable > 0) {
234                 ByteBuf bytes = buf.readBytes(readable);
235                 buf.release();
236                 ctx.fireChannelRead(bytes);
237             } else {
238                 buf.release();
239             }
240 
241             numReads = 0;
242             ctx.fireChannelReadComplete();
243         }
244         handlerRemoved0(ctx);
245     }
246 
247     /**
248      * Gets called after the {@link ByteToMessageDecoder} was removed from the actual context and it doesn't handle
249      * events anymore.
250      */
251     protected void handlerRemoved0(ChannelHandlerContext ctx) throws Exception { }
252 
253     @Override
254     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
255         if (msg instanceof ByteBuf) {
256             CodecOutputList out = CodecOutputList.newInstance();
257             try {
258                 ByteBuf data = (ByteBuf) msg;
259                 first = cumulation == null;
260                 if (first) {
261                     cumulation = data;
262                 } else {
263                     cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
264                 }
265                 callDecode(ctx, cumulation, out);
266             } catch (DecoderException e) {
267                 throw e;
268             } catch (Exception e) {
269                 throw new DecoderException(e);
270             } finally {
271                 if (cumulation != null && !cumulation.isReadable()) {
272                     numReads = 0;
273                     cumulation.release();
274                     cumulation = null;
275                 } else if (++ numReads >= discardAfterReads) {
276                     // We did enough reads already try to discard some bytes so we not risk to see a OOME.
277                     // See https://github.com/netty/netty/issues/4275
278                     numReads = 0;
279                     discardSomeReadBytes();
280                 }
281 
282                 int size = out.size();
283                 decodeWasNull = !out.insertSinceRecycled();
284                 fireChannelRead(ctx, out, size);
285                 out.recycle();
286             }
287         } else {
288             ctx.fireChannelRead(msg);
289         }
290     }
291 
292     /**
293      * Get {@code numElements} out of the {@link List} and forward these through the pipeline.
294      */
295     static void fireChannelRead(ChannelHandlerContext ctx, List<Object> msgs, int numElements) {
296         if (msgs instanceof CodecOutputList) {
297             fireChannelRead(ctx, (CodecOutputList) msgs, numElements);
298         } else {
299             for (int i = 0; i < numElements; i++) {
300                 ctx.fireChannelRead(msgs.get(i));
301             }
302         }
303     }
304 
305     /**
306      * Get {@code numElements} out of the {@link CodecOutputList} and forward these through the pipeline.
307      */
308     static void fireChannelRead(ChannelHandlerContext ctx, CodecOutputList msgs, int numElements) {
309         for (int i = 0; i < numElements; i ++) {
310             ctx.fireChannelRead(msgs.getUnsafe(i));
311         }
312     }
313 
314     @Override
315     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
316         numReads = 0;
317         discardSomeReadBytes();
318         if (decodeWasNull) {
319             decodeWasNull = false;
320             if (!ctx.channel().config().isAutoRead()) {
321                 ctx.read();
322             }
323         }
324         ctx.fireChannelReadComplete();
325     }
326 
327     protected final void discardSomeReadBytes() {
328         if (cumulation != null && !first && cumulation.refCnt() == 1) {
329             // discard some bytes if possible to make more room in the
330             // buffer but only if the refCnt == 1  as otherwise the user may have
331             // used slice().retain() or duplicate().retain().
332             //
333             // See:
334             // - https://github.com/netty/netty/issues/2327
335             // - https://github.com/netty/netty/issues/1764
336             cumulation.discardSomeReadBytes();
337         }
338     }
339 
340     @Override
341     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
342         channelInputClosed(ctx, true);
343     }
344 
345     @Override
346     public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
347         if (evt instanceof ChannelInputShutdownEvent) {
348             // The decodeLast method is invoked when a channelInactive event is encountered.
349             // This method is responsible for ending requests in some situations and must be called
350             // when the input has been shutdown.
351             channelInputClosed(ctx, false);
352         }
353         super.userEventTriggered(ctx, evt);
354     }
355 
356     private void channelInputClosed(ChannelHandlerContext ctx, boolean callChannelInactive) throws Exception {
357         CodecOutputList out = CodecOutputList.newInstance();
358         try {
359             channelInputClosed(ctx, out);
360         } catch (DecoderException e) {
361             throw e;
362         } catch (Exception e) {
363             throw new DecoderException(e);
364         } finally {
365             try {
366                 if (cumulation != null) {
367                     cumulation.release();
368                     cumulation = null;
369                 }
370                 int size = out.size();
371                 fireChannelRead(ctx, out, size);
372                 if (size > 0) {
373                     // Something was read, call fireChannelReadComplete()
374                     ctx.fireChannelReadComplete();
375                 }
376                 if (callChannelInactive) {
377                     ctx.fireChannelInactive();
378                 }
379             } finally {
380                 // Recycle in all cases
381                 out.recycle();
382             }
383         }
384     }
385 
386     /**
387      * Called when the input of the channel was closed which may be because it changed to inactive or because of
388      * {@link ChannelInputShutdownEvent}.
389      */
390     void channelInputClosed(ChannelHandlerContext ctx, List<Object> out) throws Exception {
391         if (cumulation != null) {
392             callDecode(ctx, cumulation, out);
393             decodeLast(ctx, cumulation, out);
394         } else {
395             decodeLast(ctx, Unpooled.EMPTY_BUFFER, out);
396         }
397     }
398 
399     /**
400      * Called once data should be decoded from the given {@link ByteBuf}. This method will call
401      * {@link #decode(ChannelHandlerContext, ByteBuf, List)} as long as decoding should take place.
402      *
403      * @param ctx           the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
404      * @param in            the {@link ByteBuf} from which to read data
405      * @param out           the {@link List} to which decoded messages should be added
406      */
407     protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
408         try {
409             while (in.isReadable()) {
410                 int outSize = out.size();
411 
412                 if (outSize > 0) {
413                     fireChannelRead(ctx, out, outSize);
414                     out.clear();
415 
416                     // Check if this handler was removed before continuing with decoding.
417                     // If it was removed, it is not safe to continue to operate on the buffer.
418                     //
419                     // See:
420                     // - https://github.com/netty/netty/issues/4635
421                     if (ctx.isRemoved()) {
422                         break;
423                     }
424                     outSize = 0;
425                 }
426 
427                 int oldInputLength = in.readableBytes();
428                 decodeRemovalReentryProtection(ctx, in, out);
429 
430                 // Check if this handler was removed before continuing the loop.
431                 // If it was removed, it is not safe to continue to operate on the buffer.
432                 //
433                 // See https://github.com/netty/netty/issues/1664
434                 if (ctx.isRemoved()) {
435                     break;
436                 }
437 
438                 if (outSize == out.size()) {
439                     if (oldInputLength == in.readableBytes()) {
440                         break;
441                     } else {
442                         continue;
443                     }
444                 }
445 
446                 if (oldInputLength == in.readableBytes()) {
447                     throw new DecoderException(
448                             StringUtil.simpleClassName(getClass()) +
449                                     ".decode() did not read anything but decoded a message.");
450                 }
451 
452                 if (isSingleDecode()) {
453                     break;
454                 }
455             }
456         } catch (DecoderException e) {
457             throw e;
458         } catch (Exception cause) {
459             throw new DecoderException(cause);
460         }
461     }
462 
463     /**
464      * Decode the from one {@link ByteBuf} to an other. This method will be called till either the input
465      * {@link ByteBuf} has nothing to read when return from this method or till nothing was read from the input
466      * {@link ByteBuf}.
467      *
468      * @param ctx           the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
469      * @param in            the {@link ByteBuf} from which to read data
470      * @param out           the {@link List} to which decoded messages should be added
471      * @throws Exception    is thrown if an error occurs
472      */
473     protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;
474 
475     /**
476      * Decode the from one {@link ByteBuf} to an other. This method will be called till either the input
477      * {@link ByteBuf} has nothing to read when return from this method or till nothing was read from the input
478      * {@link ByteBuf}.
479      *
480      * @param ctx           the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
481      * @param in            the {@link ByteBuf} from which to read data
482      * @param out           the {@link List} to which decoded messages should be added
483      * @throws Exception    is thrown if an error occurs
484      */
485     final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
486             throws Exception {
487         decodeState = STATE_CALLING_CHILD_DECODE;
488         try {
489             decode(ctx, in, out);
490         } finally {
491             boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING;
492             decodeState = STATE_INIT;
493             if (removePending) {
494                 handlerRemoved(ctx);
495             }
496         }
497     }
498 
499     /**
500      * Is called one last time when the {@link ChannelHandlerContext} goes in-active. Which means the
501      * {@link #channelInactive(ChannelHandlerContext)} was triggered.
502      *
503      * By default this will just call {@link #decode(ChannelHandlerContext, ByteBuf, List)} but sub-classes may
504      * override this for some special cleanup operation.
505      */
506     protected void decodeLast(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
507         if (in.isReadable()) {
508             // Only call decode() if there is something left in the buffer to decode.
509             // See https://github.com/netty/netty/issues/4386
510             decodeRemovalReentryProtection(ctx, in, out);
511         }
512     }
513 
514     static ByteBuf expandCumulation(ByteBufAllocator alloc, ByteBuf cumulation, int readable) {
515         ByteBuf oldCumulation = cumulation;
516         cumulation = alloc.buffer(oldCumulation.readableBytes() + readable);
517         cumulation.writeBytes(oldCumulation);
518         oldCumulation.release();
519         return cumulation;
520     }
521 
522     /**
523      * Cumulate {@link ByteBuf}s.
524      */
525     public interface Cumulator {
526         /**
527          * Cumulate the given {@link ByteBuf}s and return the {@link ByteBuf} that holds the cumulated bytes.
528          * The implementation is responsible to correctly handle the life-cycle of the given {@link ByteBuf}s and so
529          * call {@link ByteBuf#release()} if a {@link ByteBuf} is fully consumed.
530          */
531         ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in);
532     }
533 }