View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.codec;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufAllocator;
20  import io.netty.buffer.CompositeByteBuf;
21  import io.netty.buffer.ReadOnlyByteBuf;
22  import io.netty.buffer.Unpooled;
23  import io.netty.channel.ChannelHandlerContext;
24  import io.netty.channel.ChannelInboundHandlerAdapter;
25  import io.netty.channel.socket.ChannelInputShutdownEvent;
26  import io.netty.util.internal.StringUtil;
27  
28  import java.util.List;
29  
30  /**
31   * {@link ChannelInboundHandlerAdapter} which decodes bytes in a stream-like fashion from one {@link ByteBuf} to an
32   * other Message type.
33   *
34   * For example here is an implementation which reads all readable bytes from
35   * the input {@link ByteBuf} and create a new {@link ByteBuf}.
36   *
37   * <pre>
38   *     public class SquareDecoder extends {@link ByteToMessageDecoder} {
39   *         {@code @Override}
40   *         public void decode({@link ChannelHandlerContext} ctx, {@link ByteBuf} in, List&lt;Object&gt; out)
41   *                 throws {@link Exception} {
42   *             out.add(in.readBytes(in.readableBytes()));
43   *         }
44   *     }
45   * </pre>
46   *
47   * <h3>Frame detection</h3>
48   * <p>
49   * Generally frame detection should be handled earlier in the pipeline by adding a
50   * {@link DelimiterBasedFrameDecoder}, {@link FixedLengthFrameDecoder}, {@link LengthFieldBasedFrameDecoder},
51   * or {@link LineBasedFrameDecoder}.
52   * <p>
53   * If a custom frame decoder is required, then one needs to be careful when implementing
54   * one with {@link ByteToMessageDecoder}. Ensure there are enough bytes in the buffer for a
55   * complete frame by checking {@link ByteBuf#readableBytes()}. If there are not enough bytes
56   * for a complete frame, return without modifying the reader index to allow more bytes to arrive.
57   * <p>
58   * To check for complete frames without modifying the reader index, use methods like {@link ByteBuf#getInt(int)}.
59   * One <strong>MUST</strong> use the reader index when using methods like {@link ByteBuf#getInt(int)}.
60   * For example calling <tt>in.getInt(0)</tt> is assuming the frame starts at the beginning of the buffer, which
61   * is not always the case. Use <tt>in.getInt(in.readerIndex())</tt> instead.
62   * <h3>Pitfalls</h3>
63   * <p>
64   * Be aware that sub-classes of {@link ByteToMessageDecoder} <strong>MUST NOT</strong>
65   * annotated with {@link @Sharable}.
66   * <p>
67   * Some methods such as {@link ByteBuf#readBytes(int)} will cause a memory leak if the returned buffer
68   * is not released or added to the <tt>out</tt> {@link List}. Use derived buffers like {@link ByteBuf#readSlice(int)}
69   * to avoid leaking memory.
70   */
71  public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter {
72  
73      /**
74       * Cumulate {@link ByteBuf}s by merge them into one {@link ByteBuf}'s, using memory copies.
75       */
76      public static final Cumulator MERGE_CUMULATOR = new Cumulator() {
77          @SuppressWarnings("deprecation")
78          @Override
79          public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
80              final ByteBuf buffer;
81              if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes()
82                      || cumulation.refCnt() > 1 || cumulation instanceof ReadOnlyByteBuf) {
83                  // Expand cumulation (by replace it) when either there is not more room in the buffer
84                  // or if the refCnt is greater then 1 which may happen when the user use slice().retain() or
85                  // duplicate().retain() or if its read-only.
86                  //
87                  // See:
88                  // - https://github.com/netty/netty/issues/2327
89                  // - https://github.com/netty/netty/issues/1764
90                  buffer = expandCumulation(alloc, cumulation, in.readableBytes());
91              } else {
92                  buffer = cumulation;
93              }
94              buffer.writeBytes(in);
95              in.release();
96              return buffer;
97          }
98      };
99  
100     /**
101      * Cumulate {@link ByteBuf}s by add them to a {@link CompositeByteBuf} and so do no memory copy whenever possible.
102      * Be aware that {@link CompositeByteBuf} use a more complex indexing implementation so depending on your use-case
103      * and the decoder implementation this may be slower then just use the {@link #MERGE_CUMULATOR}.
104      */
105     public static final Cumulator COMPOSITE_CUMULATOR = new Cumulator() {
106         @Override
107         public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
108             ByteBuf buffer;
109             if (cumulation.refCnt() > 1) {
110                 // Expand cumulation (by replace it) when the refCnt is greater then 1 which may happen when the user
111                 // use slice().retain() or duplicate().retain().
112                 //
113                 // See:
114                 // - https://github.com/netty/netty/issues/2327
115                 // - https://github.com/netty/netty/issues/1764
116                 buffer = expandCumulation(alloc, cumulation, in.readableBytes());
117                 buffer.writeBytes(in);
118                 in.release();
119             } else {
120                 CompositeByteBuf composite;
121                 if (cumulation instanceof CompositeByteBuf) {
122                     composite = (CompositeByteBuf) cumulation;
123                 } else {
124                     composite = alloc.compositeBuffer(Integer.MAX_VALUE);
125                     composite.addComponent(true, cumulation);
126                 }
127                 composite.addComponent(true, in);
128                 buffer = composite;
129             }
130             return buffer;
131         }
132     };
133 
134     private static final byte STATE_INIT = 0;
135     private static final byte STATE_CALLING_CHILD_DECODE = 1;
136     private static final byte STATE_HANDLER_REMOVED_PENDING = 2;
137 
138     ByteBuf cumulation;
139     private Cumulator cumulator = MERGE_CUMULATOR;
140     private boolean singleDecode;
141     private boolean decodeWasNull;
142     private boolean first;
143     /**
144      * A bitmask where the bits are defined as
145      * <ul>
146      *     <li>{@link #STATE_INIT}</li>
147      *     <li>{@link #STATE_CALLING_CHILD_DECODE}</li>
148      *     <li>{@link #STATE_HANDLER_REMOVED_PENDING}</li>
149      * </ul>
150      */
151     private byte decodeState = STATE_INIT;
152     private int discardAfterReads = 16;
153     private int numReads;
154 
155     protected ByteToMessageDecoder() {
156         ensureNotSharable();
157     }
158 
159     /**
160      * If set then only one message is decoded on each {@link #channelRead(ChannelHandlerContext, Object)}
161      * call. This may be useful if you need to do some protocol upgrade and want to make sure nothing is mixed up.
162      *
163      * Default is {@code false} as this has performance impacts.
164      */
165     public void setSingleDecode(boolean singleDecode) {
166         this.singleDecode = singleDecode;
167     }
168 
169     /**
170      * If {@code true} then only one message is decoded on each
171      * {@link #channelRead(ChannelHandlerContext, Object)} call.
172      *
173      * Default is {@code false} as this has performance impacts.
174      */
175     public boolean isSingleDecode() {
176         return singleDecode;
177     }
178 
179     /**
180      * Set the {@link Cumulator} to use for cumulate the received {@link ByteBuf}s.
181      */
182     public void setCumulator(Cumulator cumulator) {
183         if (cumulator == null) {
184             throw new NullPointerException("cumulator");
185         }
186         this.cumulator = cumulator;
187     }
188 
189     /**
190      * Set the number of reads after which {@link ByteBuf#discardSomeReadBytes()} are called and so free up memory.
191      * The default is {@code 16}.
192      */
193     public void setDiscardAfterReads(int discardAfterReads) {
194         if (discardAfterReads <= 0) {
195             throw new IllegalArgumentException("discardAfterReads must be > 0");
196         }
197         this.discardAfterReads = discardAfterReads;
198     }
199 
200     /**
201      * Returns the actual number of readable bytes in the internal cumulative
202      * buffer of this decoder. You usually do not need to rely on this value
203      * to write a decoder. Use it only when you must use it at your own risk.
204      * This method is a shortcut to {@link #internalBuffer() internalBuffer().readableBytes()}.
205      */
206     protected int actualReadableBytes() {
207         return internalBuffer().readableBytes();
208     }
209 
210     /**
211      * Returns the internal cumulative buffer of this decoder. You usually
212      * do not need to access the internal buffer directly to write a decoder.
213      * Use it only when you must use it at your own risk.
214      */
215     protected ByteBuf internalBuffer() {
216         if (cumulation != null) {
217             return cumulation;
218         } else {
219             return Unpooled.EMPTY_BUFFER;
220         }
221     }
222 
223     @Override
224     public final void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
225         if (decodeState == STATE_CALLING_CHILD_DECODE) {
226             decodeState = STATE_HANDLER_REMOVED_PENDING;
227             return;
228         }
229         ByteBuf buf = cumulation;
230         if (buf != null) {
231             // Directly set this to null so we are sure we not access it in any other method here anymore.
232             cumulation = null;
233 
234             int readable = buf.readableBytes();
235             if (readable > 0) {
236                 ByteBuf bytes = buf.readBytes(readable);
237                 buf.release();
238                 ctx.fireChannelRead(bytes);
239             } else {
240                 buf.release();
241             }
242 
243             numReads = 0;
244             ctx.fireChannelReadComplete();
245         }
246         handlerRemoved0(ctx);
247     }
248 
249     /**
250      * Gets called after the {@link ByteToMessageDecoder} was removed from the actual context and it doesn't handle
251      * events anymore.
252      */
253     protected void handlerRemoved0(ChannelHandlerContext ctx) throws Exception { }
254 
255     @Override
256     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
257         if (msg instanceof ByteBuf) {
258             CodecOutputList out = CodecOutputList.newInstance();
259             try {
260                 ByteBuf data = (ByteBuf) msg;
261                 first = cumulation == null;
262                 if (first) {
263                     cumulation = data;
264                 } else {
265                     cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
266                 }
267                 callDecode(ctx, cumulation, out);
268             } catch (DecoderException e) {
269                 throw e;
270             } catch (Exception e) {
271                 throw new DecoderException(e);
272             } finally {
273                 if (cumulation != null && !cumulation.isReadable()) {
274                     numReads = 0;
275                     cumulation.release();
276                     cumulation = null;
277                 } else if (++ numReads >= discardAfterReads) {
278                     // We did enough reads already try to discard some bytes so we not risk to see a OOME.
279                     // See https://github.com/netty/netty/issues/4275
280                     numReads = 0;
281                     discardSomeReadBytes();
282                 }
283 
284                 int size = out.size();
285                 decodeWasNull = !out.insertSinceRecycled();
286                 fireChannelRead(ctx, out, size);
287                 out.recycle();
288             }
289         } else {
290             ctx.fireChannelRead(msg);
291         }
292     }
293 
294     /**
295      * Get {@code numElements} out of the {@link List} and forward these through the pipeline.
296      */
297     static void fireChannelRead(ChannelHandlerContext ctx, List<Object> msgs, int numElements) {
298         if (msgs instanceof CodecOutputList) {
299             fireChannelRead(ctx, (CodecOutputList) msgs, numElements);
300         } else {
301             for (int i = 0; i < numElements; i++) {
302                 ctx.fireChannelRead(msgs.get(i));
303             }
304         }
305     }
306 
307     /**
308      * Get {@code numElements} out of the {@link CodecOutputList} and forward these through the pipeline.
309      */
310     static void fireChannelRead(ChannelHandlerContext ctx, CodecOutputList msgs, int numElements) {
311         for (int i = 0; i < numElements; i ++) {
312             ctx.fireChannelRead(msgs.getUnsafe(i));
313         }
314     }
315 
316     @Override
317     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
318         numReads = 0;
319         discardSomeReadBytes();
320         if (decodeWasNull) {
321             decodeWasNull = false;
322             if (!ctx.channel().config().isAutoRead()) {
323                 ctx.read();
324             }
325         }
326         ctx.fireChannelReadComplete();
327     }
328 
329     protected final void discardSomeReadBytes() {
330         if (cumulation != null && !first && cumulation.refCnt() == 1) {
331             // discard some bytes if possible to make more room in the
332             // buffer but only if the refCnt == 1  as otherwise the user may have
333             // used slice().retain() or duplicate().retain().
334             //
335             // See:
336             // - https://github.com/netty/netty/issues/2327
337             // - https://github.com/netty/netty/issues/1764
338             cumulation.discardSomeReadBytes();
339         }
340     }
341 
342     @Override
343     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
344         channelInputClosed(ctx, true);
345     }
346 
347     @Override
348     public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
349         if (evt instanceof ChannelInputShutdownEvent) {
350             // The decodeLast method is invoked when a channelInactive event is encountered.
351             // This method is responsible for ending requests in some situations and must be called
352             // when the input has been shutdown.
353             channelInputClosed(ctx, false);
354         }
355         super.userEventTriggered(ctx, evt);
356     }
357 
358     private void channelInputClosed(ChannelHandlerContext ctx, boolean callChannelInactive) throws Exception {
359         CodecOutputList out = CodecOutputList.newInstance();
360         try {
361             channelInputClosed(ctx, out);
362         } catch (DecoderException e) {
363             throw e;
364         } catch (Exception e) {
365             throw new DecoderException(e);
366         } finally {
367             try {
368                 if (cumulation != null) {
369                     cumulation.release();
370                     cumulation = null;
371                 }
372                 int size = out.size();
373                 fireChannelRead(ctx, out, size);
374                 if (size > 0) {
375                     // Something was read, call fireChannelReadComplete()
376                     ctx.fireChannelReadComplete();
377                 }
378                 if (callChannelInactive) {
379                     ctx.fireChannelInactive();
380                 }
381             } finally {
382                 // Recycle in all cases
383                 out.recycle();
384             }
385         }
386     }
387 
388     /**
389      * Called when the input of the channel was closed which may be because it changed to inactive or because of
390      * {@link ChannelInputShutdownEvent}.
391      */
392     void channelInputClosed(ChannelHandlerContext ctx, List<Object> out) throws Exception {
393         if (cumulation != null) {
394             callDecode(ctx, cumulation, out);
395             decodeLast(ctx, cumulation, out);
396         } else {
397             decodeLast(ctx, Unpooled.EMPTY_BUFFER, out);
398         }
399     }
400 
401     /**
402      * Called once data should be decoded from the given {@link ByteBuf}. This method will call
403      * {@link #decode(ChannelHandlerContext, ByteBuf, List)} as long as decoding should take place.
404      *
405      * @param ctx           the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
406      * @param in            the {@link ByteBuf} from which to read data
407      * @param out           the {@link List} to which decoded messages should be added
408      */
409     protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
410         try {
411             while (in.isReadable()) {
412                 int outSize = out.size();
413 
414                 if (outSize > 0) {
415                     fireChannelRead(ctx, out, outSize);
416                     out.clear();
417 
418                     // Check if this handler was removed before continuing with decoding.
419                     // If it was removed, it is not safe to continue to operate on the buffer.
420                     //
421                     // See:
422                     // - https://github.com/netty/netty/issues/4635
423                     if (ctx.isRemoved()) {
424                         break;
425                     }
426                     outSize = 0;
427                 }
428 
429                 int oldInputLength = in.readableBytes();
430                 decodeRemovalReentryProtection(ctx, in, out);
431 
432                 // Check if this handler was removed before continuing the loop.
433                 // If it was removed, it is not safe to continue to operate on the buffer.
434                 //
435                 // See https://github.com/netty/netty/issues/1664
436                 if (ctx.isRemoved()) {
437                     break;
438                 }
439 
440                 if (outSize == out.size()) {
441                     if (oldInputLength == in.readableBytes()) {
442                         break;
443                     } else {
444                         continue;
445                     }
446                 }
447 
448                 if (oldInputLength == in.readableBytes()) {
449                     throw new DecoderException(
450                             StringUtil.simpleClassName(getClass()) +
451                                     ".decode() did not read anything but decoded a message.");
452                 }
453 
454                 if (isSingleDecode()) {
455                     break;
456                 }
457             }
458         } catch (DecoderException e) {
459             throw e;
460         } catch (Exception cause) {
461             throw new DecoderException(cause);
462         }
463     }
464 
465     /**
466      * Decode the from one {@link ByteBuf} to an other. This method will be called till either the input
467      * {@link ByteBuf} has nothing to read when return from this method or till nothing was read from the input
468      * {@link ByteBuf}.
469      *
470      * @param ctx           the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
471      * @param in            the {@link ByteBuf} from which to read data
472      * @param out           the {@link List} to which decoded messages should be added
473      * @throws Exception    is thrown if an error occurs
474      */
475     protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;
476 
477     /**
478      * Decode the from one {@link ByteBuf} to an other. This method will be called till either the input
479      * {@link ByteBuf} has nothing to read when return from this method or till nothing was read from the input
480      * {@link ByteBuf}.
481      *
482      * @param ctx           the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
483      * @param in            the {@link ByteBuf} from which to read data
484      * @param out           the {@link List} to which decoded messages should be added
485      * @throws Exception    is thrown if an error occurs
486      */
487     final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
488             throws Exception {
489         decodeState = STATE_CALLING_CHILD_DECODE;
490         try {
491             decode(ctx, in, out);
492         } finally {
493             boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING;
494             decodeState = STATE_INIT;
495             if (removePending) {
496                 handlerRemoved(ctx);
497             }
498         }
499     }
500 
501     /**
502      * Is called one last time when the {@link ChannelHandlerContext} goes in-active. Which means the
503      * {@link #channelInactive(ChannelHandlerContext)} was triggered.
504      *
505      * By default this will just call {@link #decode(ChannelHandlerContext, ByteBuf, List)} but sub-classes may
506      * override this for some special cleanup operation.
507      */
508     protected void decodeLast(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
509         if (in.isReadable()) {
510             // Only call decode() if there is something left in the buffer to decode.
511             // See https://github.com/netty/netty/issues/4386
512             decodeRemovalReentryProtection(ctx, in, out);
513         }
514     }
515 
516     static ByteBuf expandCumulation(ByteBufAllocator alloc, ByteBuf cumulation, int readable) {
517         ByteBuf oldCumulation = cumulation;
518         cumulation = alloc.buffer(oldCumulation.readableBytes() + readable);
519         cumulation.writeBytes(oldCumulation);
520         oldCumulation.release();
521         return cumulation;
522     }
523 
524     /**
525      * Cumulate {@link ByteBuf}s.
526      */
527     public interface Cumulator {
528         /**
529          * Cumulate the given {@link ByteBuf}s and return the {@link ByteBuf} that holds the cumulated bytes.
530          * The implementation is responsible to correctly handle the life-cycle of the given {@link ByteBuf}s and so
531          * call {@link ByteBuf#release()} if a {@link ByteBuf} is fully consumed.
532          */
533         ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in);
534     }
535 }