View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.codec;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufAllocator;
20  import io.netty.buffer.CompositeByteBuf;
21  import io.netty.buffer.Unpooled;
22  import io.netty.channel.ChannelHandlerContext;
23  import io.netty.channel.ChannelInboundHandlerAdapter;
24  import io.netty.channel.socket.ChannelInputShutdownEvent;
25  import io.netty.util.internal.StringUtil;
26  
27  import java.util.List;
28  
29  /**
30   * {@link ChannelInboundHandlerAdapter} which decodes bytes in a stream-like fashion from one {@link ByteBuf} to an
31   * other Message type.
32   *
33   * For example here is an implementation which reads all readable bytes from
34   * the input {@link ByteBuf} and create a new {@link ByteBuf}.
35   *
36   * <pre>
37   *     public class SquareDecoder extends {@link ByteToMessageDecoder} {
38   *         {@code @Override}
39   *         public void decode({@link ChannelHandlerContext} ctx, {@link ByteBuf} in, List&lt;Object&gt; out)
40   *                 throws {@link Exception} {
41   *             out.add(in.readBytes(in.readableBytes()));
42   *         }
43   *     }
44   * </pre>
45   *
46   * <h3>Frame detection</h3>
47   * <p>
48   * Generally frame detection should be handled earlier in the pipeline by adding a
49   * {@link DelimiterBasedFrameDecoder}, {@link FixedLengthFrameDecoder}, {@link LengthFieldBasedFrameDecoder},
50   * or {@link LineBasedFrameDecoder}.
51   * <p>
52   * If a custom frame decoder is required, then one needs to be careful when implementing
53   * one with {@link ByteToMessageDecoder}. Ensure there are enough bytes in the buffer for a
54   * complete frame by checking {@link ByteBuf#readableBytes()}. If there are not enough bytes
55   * for a complete frame, return without modifying the reader index to allow more bytes to arrive.
56   * <p>
57   * To check for complete frames without modifying the reader index, use methods like {@link ByteBuf#getInt(int)}.
58   * One <strong>MUST</strong> use the reader index when using methods like {@link ByteBuf#getInt(int)}.
59   * For example calling <tt>in.getInt(0)</tt> is assuming the frame starts at the beginning of the buffer, which
60   * is not always the case. Use <tt>in.getInt(in.readerIndex())</tt> instead.
61   * <h3>Pitfalls</h3>
62   * <p>
63   * Be aware that sub-classes of {@link ByteToMessageDecoder} <strong>MUST NOT</strong>
64   * annotated with {@link @Sharable}.
65   * <p>
66   * Some methods such as {@link ByteBuf#readBytes(int)} will cause a memory leak if the returned buffer
67   * is not released or added to the <tt>out</tt> {@link List}. Use derived buffers like {@link ByteBuf#readSlice(int)}
68   * to avoid leaking memory.
69   */
70  public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter {
71  
72      /**
73       * Cumulate {@link ByteBuf}s by merge them into one {@link ByteBuf}'s, using memory copies.
74       */
75      public static final Cumulator MERGE_CUMULATOR = new Cumulator() {
76          @Override
77          public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
78              try {
79                  final ByteBuf buffer;
80                  if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes()
81                      || cumulation.refCnt() > 1 || cumulation.isReadOnly()) {
82                      // Expand cumulation (by replace it) when either there is not more room in the buffer
83                      // or if the refCnt is greater then 1 which may happen when the user use slice().retain() or
84                      // duplicate().retain() or if its read-only.
85                      //
86                      // See:
87                      // - https://github.com/netty/netty/issues/2327
88                      // - https://github.com/netty/netty/issues/1764
89                      buffer = expandCumulation(alloc, cumulation, in.readableBytes());
90                  } else {
91                      buffer = cumulation;
92                  }
93                  buffer.writeBytes(in);
94                  return buffer;
95              } finally {
96                  // We must release in in all cases as otherwise it may produce a leak if writeBytes(...) throw
97                  // for whatever release (for example because of OutOfMemoryError)
98                  in.release();
99              }
100         }
101     };
102 
103     /**
104      * Cumulate {@link ByteBuf}s by add them to a {@link CompositeByteBuf} and so do no memory copy whenever possible.
105      * Be aware that {@link CompositeByteBuf} use a more complex indexing implementation so depending on your use-case
106      * and the decoder implementation this may be slower then just use the {@link #MERGE_CUMULATOR}.
107      */
108     public static final Cumulator COMPOSITE_CUMULATOR = new Cumulator() {
109         @Override
110         public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
111             ByteBuf buffer;
112             try {
113                 if (cumulation.refCnt() > 1) {
114                     // Expand cumulation (by replace it) when the refCnt is greater then 1 which may happen when the
115                     // user use slice().retain() or duplicate().retain().
116                     //
117                     // See:
118                     // - https://github.com/netty/netty/issues/2327
119                     // - https://github.com/netty/netty/issues/1764
120                     buffer = expandCumulation(alloc, cumulation, in.readableBytes());
121                     buffer.writeBytes(in);
122                 } else {
123                     CompositeByteBuf composite;
124                     if (cumulation instanceof CompositeByteBuf) {
125                         composite = (CompositeByteBuf) cumulation;
126                     } else {
127                         composite = alloc.compositeBuffer(Integer.MAX_VALUE);
128                         composite.addComponent(true, cumulation);
129                     }
130                     composite.addComponent(true, in);
131                     in = null;
132                     buffer = composite;
133                 }
134                 return buffer;
135             } finally {
136                 if (in != null) {
137                     // We must release if the ownership was not transfered as otherwise it may produce a leak if
138                     // writeBytes(...) throw for whatever release (for example because of OutOfMemoryError).
139                     in.release();
140                 }
141             }
142         }
143     };
144 
145     private static final byte STATE_INIT = 0;
146     private static final byte STATE_CALLING_CHILD_DECODE = 1;
147     private static final byte STATE_HANDLER_REMOVED_PENDING = 2;
148 
149     ByteBuf cumulation;
150     private Cumulator cumulator = MERGE_CUMULATOR;
151     private boolean singleDecode;
152     private boolean decodeWasNull;
153     private boolean first;
154     /**
155      * A bitmask where the bits are defined as
156      * <ul>
157      *     <li>{@link #STATE_INIT}</li>
158      *     <li>{@link #STATE_CALLING_CHILD_DECODE}</li>
159      *     <li>{@link #STATE_HANDLER_REMOVED_PENDING}</li>
160      * </ul>
161      */
162     private byte decodeState = STATE_INIT;
163     private int discardAfterReads = 16;
164     private int numReads;
165 
166     protected ByteToMessageDecoder() {
167         ensureNotSharable();
168     }
169 
170     /**
171      * If set then only one message is decoded on each {@link #channelRead(ChannelHandlerContext, Object)}
172      * call. This may be useful if you need to do some protocol upgrade and want to make sure nothing is mixed up.
173      *
174      * Default is {@code false} as this has performance impacts.
175      */
176     public void setSingleDecode(boolean singleDecode) {
177         this.singleDecode = singleDecode;
178     }
179 
180     /**
181      * If {@code true} then only one message is decoded on each
182      * {@link #channelRead(ChannelHandlerContext, Object)} call.
183      *
184      * Default is {@code false} as this has performance impacts.
185      */
186     public boolean isSingleDecode() {
187         return singleDecode;
188     }
189 
190     /**
191      * Set the {@link Cumulator} to use for cumulate the received {@link ByteBuf}s.
192      */
193     public void setCumulator(Cumulator cumulator) {
194         if (cumulator == null) {
195             throw new NullPointerException("cumulator");
196         }
197         this.cumulator = cumulator;
198     }
199 
200     /**
201      * Set the number of reads after which {@link ByteBuf#discardSomeReadBytes()} are called and so free up memory.
202      * The default is {@code 16}.
203      */
204     public void setDiscardAfterReads(int discardAfterReads) {
205         if (discardAfterReads <= 0) {
206             throw new IllegalArgumentException("discardAfterReads must be > 0");
207         }
208         this.discardAfterReads = discardAfterReads;
209     }
210 
211     /**
212      * Returns the actual number of readable bytes in the internal cumulative
213      * buffer of this decoder. You usually do not need to rely on this value
214      * to write a decoder. Use it only when you must use it at your own risk.
215      * This method is a shortcut to {@link #internalBuffer() internalBuffer().readableBytes()}.
216      */
217     protected int actualReadableBytes() {
218         return internalBuffer().readableBytes();
219     }
220 
221     /**
222      * Returns the internal cumulative buffer of this decoder. You usually
223      * do not need to access the internal buffer directly to write a decoder.
224      * Use it only when you must use it at your own risk.
225      */
226     protected ByteBuf internalBuffer() {
227         if (cumulation != null) {
228             return cumulation;
229         } else {
230             return Unpooled.EMPTY_BUFFER;
231         }
232     }
233 
234     @Override
235     public final void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
236         if (decodeState == STATE_CALLING_CHILD_DECODE) {
237             decodeState = STATE_HANDLER_REMOVED_PENDING;
238             return;
239         }
240         ByteBuf buf = cumulation;
241         if (buf != null) {
242             // Directly set this to null so we are sure we not access it in any other method here anymore.
243             cumulation = null;
244 
245             int readable = buf.readableBytes();
246             if (readable > 0) {
247                 ByteBuf bytes = buf.readBytes(readable);
248                 buf.release();
249                 ctx.fireChannelRead(bytes);
250             } else {
251                 buf.release();
252             }
253 
254             numReads = 0;
255             ctx.fireChannelReadComplete();
256         }
257         handlerRemoved0(ctx);
258     }
259 
260     /**
261      * Gets called after the {@link ByteToMessageDecoder} was removed from the actual context and it doesn't handle
262      * events anymore.
263      */
264     protected void handlerRemoved0(ChannelHandlerContext ctx) throws Exception { }
265 
266     @Override
267     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
268         if (msg instanceof ByteBuf) {
269             CodecOutputList out = CodecOutputList.newInstance();
270             try {
271                 ByteBuf data = (ByteBuf) msg;
272                 first = cumulation == null;
273                 if (first) {
274                     cumulation = data;
275                 } else {
276                     cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
277                 }
278                 callDecode(ctx, cumulation, out);
279             } catch (DecoderException e) {
280                 throw e;
281             } catch (Exception e) {
282                 throw new DecoderException(e);
283             } finally {
284                 if (cumulation != null && !cumulation.isReadable()) {
285                     numReads = 0;
286                     cumulation.release();
287                     cumulation = null;
288                 } else if (++ numReads >= discardAfterReads) {
289                     // We did enough reads already try to discard some bytes so we not risk to see a OOME.
290                     // See https://github.com/netty/netty/issues/4275
291                     numReads = 0;
292                     discardSomeReadBytes();
293                 }
294 
295                 int size = out.size();
296                 decodeWasNull = !out.insertSinceRecycled();
297                 fireChannelRead(ctx, out, size);
298                 out.recycle();
299             }
300         } else {
301             ctx.fireChannelRead(msg);
302         }
303     }
304 
305     /**
306      * Get {@code numElements} out of the {@link List} and forward these through the pipeline.
307      */
308     static void fireChannelRead(ChannelHandlerContext ctx, List<Object> msgs, int numElements) {
309         if (msgs instanceof CodecOutputList) {
310             fireChannelRead(ctx, (CodecOutputList) msgs, numElements);
311         } else {
312             for (int i = 0; i < numElements; i++) {
313                 ctx.fireChannelRead(msgs.get(i));
314             }
315         }
316     }
317 
318     /**
319      * Get {@code numElements} out of the {@link CodecOutputList} and forward these through the pipeline.
320      */
321     static void fireChannelRead(ChannelHandlerContext ctx, CodecOutputList msgs, int numElements) {
322         for (int i = 0; i < numElements; i ++) {
323             ctx.fireChannelRead(msgs.getUnsafe(i));
324         }
325     }
326 
327     @Override
328     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
329         numReads = 0;
330         discardSomeReadBytes();
331         if (decodeWasNull) {
332             decodeWasNull = false;
333             if (!ctx.channel().config().isAutoRead()) {
334                 ctx.read();
335             }
336         }
337         ctx.fireChannelReadComplete();
338     }
339 
340     protected final void discardSomeReadBytes() {
341         if (cumulation != null && !first && cumulation.refCnt() == 1) {
342             // discard some bytes if possible to make more room in the
343             // buffer but only if the refCnt == 1  as otherwise the user may have
344             // used slice().retain() or duplicate().retain().
345             //
346             // See:
347             // - https://github.com/netty/netty/issues/2327
348             // - https://github.com/netty/netty/issues/1764
349             cumulation.discardSomeReadBytes();
350         }
351     }
352 
353     @Override
354     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
355         channelInputClosed(ctx, true);
356     }
357 
358     @Override
359     public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
360         if (evt instanceof ChannelInputShutdownEvent) {
361             // The decodeLast method is invoked when a channelInactive event is encountered.
362             // This method is responsible for ending requests in some situations and must be called
363             // when the input has been shutdown.
364             channelInputClosed(ctx, false);
365         }
366         super.userEventTriggered(ctx, evt);
367     }
368 
369     private void channelInputClosed(ChannelHandlerContext ctx, boolean callChannelInactive) throws Exception {
370         CodecOutputList out = CodecOutputList.newInstance();
371         try {
372             channelInputClosed(ctx, out);
373         } catch (DecoderException e) {
374             throw e;
375         } catch (Exception e) {
376             throw new DecoderException(e);
377         } finally {
378             try {
379                 if (cumulation != null) {
380                     cumulation.release();
381                     cumulation = null;
382                 }
383                 int size = out.size();
384                 fireChannelRead(ctx, out, size);
385                 if (size > 0) {
386                     // Something was read, call fireChannelReadComplete()
387                     ctx.fireChannelReadComplete();
388                 }
389                 if (callChannelInactive) {
390                     ctx.fireChannelInactive();
391                 }
392             } finally {
393                 // Recycle in all cases
394                 out.recycle();
395             }
396         }
397     }
398 
399     /**
400      * Called when the input of the channel was closed which may be because it changed to inactive or because of
401      * {@link ChannelInputShutdownEvent}.
402      */
403     void channelInputClosed(ChannelHandlerContext ctx, List<Object> out) throws Exception {
404         if (cumulation != null) {
405             callDecode(ctx, cumulation, out);
406             decodeLast(ctx, cumulation, out);
407         } else {
408             decodeLast(ctx, Unpooled.EMPTY_BUFFER, out);
409         }
410     }
411 
412     /**
413      * Called once data should be decoded from the given {@link ByteBuf}. This method will call
414      * {@link #decode(ChannelHandlerContext, ByteBuf, List)} as long as decoding should take place.
415      *
416      * @param ctx           the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
417      * @param in            the {@link ByteBuf} from which to read data
418      * @param out           the {@link List} to which decoded messages should be added
419      */
420     protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
421         try {
422             while (in.isReadable()) {
423                 int outSize = out.size();
424 
425                 if (outSize > 0) {
426                     fireChannelRead(ctx, out, outSize);
427                     out.clear();
428 
429                     // Check if this handler was removed before continuing with decoding.
430                     // If it was removed, it is not safe to continue to operate on the buffer.
431                     //
432                     // See:
433                     // - https://github.com/netty/netty/issues/4635
434                     if (ctx.isRemoved()) {
435                         break;
436                     }
437                     outSize = 0;
438                 }
439 
440                 int oldInputLength = in.readableBytes();
441                 decodeRemovalReentryProtection(ctx, in, out);
442 
443                 // Check if this handler was removed before continuing the loop.
444                 // If it was removed, it is not safe to continue to operate on the buffer.
445                 //
446                 // See https://github.com/netty/netty/issues/1664
447                 if (ctx.isRemoved()) {
448                     break;
449                 }
450 
451                 if (outSize == out.size()) {
452                     if (oldInputLength == in.readableBytes()) {
453                         break;
454                     } else {
455                         continue;
456                     }
457                 }
458 
459                 if (oldInputLength == in.readableBytes()) {
460                     throw new DecoderException(
461                             StringUtil.simpleClassName(getClass()) +
462                                     ".decode() did not read anything but decoded a message.");
463                 }
464 
465                 if (isSingleDecode()) {
466                     break;
467                 }
468             }
469         } catch (DecoderException e) {
470             throw e;
471         } catch (Exception cause) {
472             throw new DecoderException(cause);
473         }
474     }
475 
476     /**
477      * Decode the from one {@link ByteBuf} to an other. This method will be called till either the input
478      * {@link ByteBuf} has nothing to read when return from this method or till nothing was read from the input
479      * {@link ByteBuf}.
480      *
481      * @param ctx           the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
482      * @param in            the {@link ByteBuf} from which to read data
483      * @param out           the {@link List} to which decoded messages should be added
484      * @throws Exception    is thrown if an error occurs
485      */
486     protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;
487 
488     /**
489      * Decode the from one {@link ByteBuf} to an other. This method will be called till either the input
490      * {@link ByteBuf} has nothing to read when return from this method or till nothing was read from the input
491      * {@link ByteBuf}.
492      *
493      * @param ctx           the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
494      * @param in            the {@link ByteBuf} from which to read data
495      * @param out           the {@link List} to which decoded messages should be added
496      * @throws Exception    is thrown if an error occurs
497      */
498     final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
499             throws Exception {
500         decodeState = STATE_CALLING_CHILD_DECODE;
501         try {
502             decode(ctx, in, out);
503         } finally {
504             boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING;
505             decodeState = STATE_INIT;
506             if (removePending) {
507                 handlerRemoved(ctx);
508             }
509         }
510     }
511 
512     /**
513      * Is called one last time when the {@link ChannelHandlerContext} goes in-active. Which means the
514      * {@link #channelInactive(ChannelHandlerContext)} was triggered.
515      *
516      * By default this will just call {@link #decode(ChannelHandlerContext, ByteBuf, List)} but sub-classes may
517      * override this for some special cleanup operation.
518      */
519     protected void decodeLast(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
520         if (in.isReadable()) {
521             // Only call decode() if there is something left in the buffer to decode.
522             // See https://github.com/netty/netty/issues/4386
523             decodeRemovalReentryProtection(ctx, in, out);
524         }
525     }
526 
527     static ByteBuf expandCumulation(ByteBufAllocator alloc, ByteBuf cumulation, int readable) {
528         ByteBuf oldCumulation = cumulation;
529         cumulation = alloc.buffer(oldCumulation.readableBytes() + readable);
530         cumulation.writeBytes(oldCumulation);
531         oldCumulation.release();
532         return cumulation;
533     }
534 
535     /**
536      * Cumulate {@link ByteBuf}s.
537      */
538     public interface Cumulator {
539         /**
540          * Cumulate the given {@link ByteBuf}s and return the {@link ByteBuf} that holds the cumulated bytes.
541          * The implementation is responsible to correctly handle the life-cycle of the given {@link ByteBuf}s and so
542          * call {@link ByteBuf#release()} if a {@link ByteBuf} is fully consumed.
543          */
544         ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in);
545     }
546 }