View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.codec;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufAllocator;
20  import io.netty.buffer.CompositeByteBuf;
21  import io.netty.buffer.Unpooled;
22  import io.netty.channel.ChannelConfig;
23  import io.netty.channel.ChannelHandlerContext;
24  import io.netty.channel.ChannelInboundHandlerAdapter;
25  import io.netty.channel.socket.ChannelInputShutdownEvent;
26  import io.netty.util.internal.ObjectUtil;
27  import io.netty.util.internal.StringUtil;
28  
29  import java.util.List;
30  
31  import static io.netty.util.internal.ObjectUtil.checkPositive;
32  import static java.lang.Integer.MAX_VALUE;
33  
34  /**
35   * {@link ChannelInboundHandlerAdapter} which decodes bytes in a stream-like fashion from one {@link ByteBuf} to an
36   * other Message type.
37   *
38   * For example here is an implementation which reads all readable bytes from
39   * the input {@link ByteBuf} and create a new {@link ByteBuf}.
40   *
41   * <pre>
42   *     public class SquareDecoder extends {@link ByteToMessageDecoder} {
43   *         {@code @Override}
44   *         public void decode({@link ChannelHandlerContext} ctx, {@link ByteBuf} in, List&lt;Object&gt; out)
45   *                 throws {@link Exception} {
46   *             out.add(in.readBytes(in.readableBytes()));
47   *         }
48   *     }
49   * </pre>
50   *
51   * <h3>Frame detection</h3>
52   * <p>
53   * Generally frame detection should be handled earlier in the pipeline by adding a
54   * {@link DelimiterBasedFrameDecoder}, {@link FixedLengthFrameDecoder}, {@link LengthFieldBasedFrameDecoder},
55   * or {@link LineBasedFrameDecoder}.
56   * <p>
57   * If a custom frame decoder is required, then one needs to be careful when implementing
58   * one with {@link ByteToMessageDecoder}. Ensure there are enough bytes in the buffer for a
59   * complete frame by checking {@link ByteBuf#readableBytes()}. If there are not enough bytes
60   * for a complete frame, return without modifying the reader index to allow more bytes to arrive.
61   * <p>
62   * To check for complete frames without modifying the reader index, use methods like {@link ByteBuf#getInt(int)}.
63   * One <strong>MUST</strong> use the reader index when using methods like {@link ByteBuf#getInt(int)}.
64   * For example calling <tt>in.getInt(0)</tt> is assuming the frame starts at the beginning of the buffer, which
65   * is not always the case. Use <tt>in.getInt(in.readerIndex())</tt> instead.
66   * <h3>Pitfalls</h3>
67   * <p>
68   * Be aware that sub-classes of {@link ByteToMessageDecoder} <strong>MUST NOT</strong>
69   * annotated with {@link @Sharable}.
70   * <p>
71   * Some methods such as {@link ByteBuf#readBytes(int)} will cause a memory leak if the returned buffer
72   * is not released or added to the <tt>out</tt> {@link List}. Use derived buffers like {@link ByteBuf#readSlice(int)}
73   * to avoid leaking memory.
74   */
75  public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter {
76  
77      /**
78       * Cumulate {@link ByteBuf}s by merge them into one {@link ByteBuf}'s, using memory copies.
79       */
80      public static final Cumulator MERGE_CUMULATOR = new Cumulator() {
81          @Override
82          public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
83              if (!cumulation.isReadable() && in.isContiguous()) {
84                  // If cumulation is empty and input buffer is contiguous, use it directly
85                  cumulation.release();
86                  return in;
87              }
88              try {
89                  final int required = in.readableBytes();
90                  if (required > cumulation.maxWritableBytes() ||
91                      required > cumulation.maxFastWritableBytes() && cumulation.refCnt() > 1 ||
92                      cumulation.isReadOnly()) {
93                      // Expand cumulation (by replacing it) under the following conditions:
94                      // - cumulation cannot be resized to accommodate the additional data
95                      // - cumulation can be expanded with a reallocation operation to accommodate but the buffer is
96                      //   assumed to be shared (e.g. refCnt() > 1) and the reallocation may not be safe.
97                      return expandCumulation(alloc, cumulation, in);
98                  }
99                  cumulation.writeBytes(in, in.readerIndex(), required);
100                 in.readerIndex(in.writerIndex());
101                 return cumulation;
102             } finally {
103                 // We must release in all cases as otherwise it may produce a leak if writeBytes(...) throw
104                 // for whatever release (for example because of OutOfMemoryError)
105                 in.release();
106             }
107         }
108     };
109 
110     /**
111      * Cumulate {@link ByteBuf}s by add them to a {@link CompositeByteBuf} and so do no memory copy whenever possible.
112      * Be aware that {@link CompositeByteBuf} use a more complex indexing implementation so depending on your use-case
113      * and the decoder implementation this may be slower than just use the {@link #MERGE_CUMULATOR}.
114      */
115     public static final Cumulator COMPOSITE_CUMULATOR = new Cumulator() {
116         @Override
117         public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
118             if (!cumulation.isReadable()) {
119                 cumulation.release();
120                 return in;
121             }
122             CompositeByteBuf composite = null;
123             try {
124                 if (cumulation instanceof CompositeByteBuf && cumulation.refCnt() == 1) {
125                     composite = (CompositeByteBuf) cumulation;
126                     // Writer index must equal capacity if we are going to "write"
127                     // new components to the end
128                     if (composite.writerIndex() != composite.capacity()) {
129                         composite.capacity(composite.writerIndex());
130                     }
131                 } else {
132                     composite = alloc.compositeBuffer(Integer.MAX_VALUE).addFlattenedComponents(true, cumulation);
133                 }
134                 composite.addFlattenedComponents(true, in);
135                 in = null;
136                 return composite;
137             } finally {
138                 if (in != null) {
139                     // We must release if the ownership was not transferred as otherwise it may produce a leak
140                     in.release();
141                     // Also release any new buffer allocated if we're not returning it
142                     if (composite != null && composite != cumulation) {
143                         composite.release();
144                     }
145                 }
146             }
147         }
148     };
149 
150     private static final byte STATE_INIT = 0;
151     private static final byte STATE_CALLING_CHILD_DECODE = 1;
152     private static final byte STATE_HANDLER_REMOVED_PENDING = 2;
153 
154     ByteBuf cumulation;
155     private Cumulator cumulator = MERGE_CUMULATOR;
156     private boolean singleDecode;
157     private boolean first;
158 
159     /**
160      * This flag is used to determine if we need to call {@link ChannelHandlerContext#read()} to consume more data
161      * when {@link ChannelConfig#isAutoRead()} is {@code false}.
162      */
163     private boolean firedChannelRead;
164 
165     private boolean selfFiredChannelRead;
166 
167     /**
168      * A bitmask where the bits are defined as
169      * <ul>
170      *     <li>{@link #STATE_INIT}</li>
171      *     <li>{@link #STATE_CALLING_CHILD_DECODE}</li>
172      *     <li>{@link #STATE_HANDLER_REMOVED_PENDING}</li>
173      * </ul>
174      */
175     private byte decodeState = STATE_INIT;
176     private int discardAfterReads = 16;
177     private int numReads;
178 
179     protected ByteToMessageDecoder() {
180         ensureNotSharable();
181     }
182 
183     /**
184      * If set then only one message is decoded on each {@link #channelRead(ChannelHandlerContext, Object)}
185      * call. This may be useful if you need to do some protocol upgrade and want to make sure nothing is mixed up.
186      *
187      * Default is {@code false} as this has performance impacts.
188      */
189     public void setSingleDecode(boolean singleDecode) {
190         this.singleDecode = singleDecode;
191     }
192 
193     /**
194      * If {@code true} then only one message is decoded on each
195      * {@link #channelRead(ChannelHandlerContext, Object)} call.
196      *
197      * Default is {@code false} as this has performance impacts.
198      */
199     public boolean isSingleDecode() {
200         return singleDecode;
201     }
202 
203     /**
204      * Set the {@link Cumulator} to use for cumulate the received {@link ByteBuf}s.
205      */
206     public void setCumulator(Cumulator cumulator) {
207         this.cumulator = ObjectUtil.checkNotNull(cumulator, "cumulator");
208     }
209 
210     /**
211      * Set the number of reads after which {@link ByteBuf#discardSomeReadBytes()} are called and so free up memory.
212      * The default is {@code 16}.
213      */
214     public void setDiscardAfterReads(int discardAfterReads) {
215         checkPositive(discardAfterReads, "discardAfterReads");
216         this.discardAfterReads = discardAfterReads;
217     }
218 
219     /**
220      * Returns the actual number of readable bytes in the internal cumulative
221      * buffer of this decoder. You usually do not need to rely on this value
222      * to write a decoder. Use it only when you must use it at your own risk.
223      * This method is a shortcut to {@link #internalBuffer() internalBuffer().readableBytes()}.
224      */
225     protected int actualReadableBytes() {
226         return internalBuffer().readableBytes();
227     }
228 
229     /**
230      * Returns the internal cumulative buffer of this decoder. You usually
231      * do not need to access the internal buffer directly to write a decoder.
232      * Use it only when you must use it at your own risk.
233      */
234     protected ByteBuf internalBuffer() {
235         if (cumulation != null) {
236             return cumulation;
237         } else {
238             return Unpooled.EMPTY_BUFFER;
239         }
240     }
241 
242     @Override
243     public final void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
244         if (decodeState == STATE_CALLING_CHILD_DECODE) {
245             decodeState = STATE_HANDLER_REMOVED_PENDING;
246             return;
247         }
248         ByteBuf buf = cumulation;
249         if (buf != null) {
250             // Directly set this to null, so we are sure we not access it in any other method here anymore.
251             cumulation = null;
252             numReads = 0;
253             int readable = buf.readableBytes();
254             if (readable > 0) {
255                 ctx.fireChannelRead(buf);
256                 ctx.fireChannelReadComplete();
257             } else {
258                 buf.release();
259             }
260         }
261         handlerRemoved0(ctx);
262     }
263 
264     /**
265      * Gets called after the {@link ByteToMessageDecoder} was removed from the actual context and it doesn't handle
266      * events anymore.
267      */
268     protected void handlerRemoved0(ChannelHandlerContext ctx) throws Exception { }
269 
270     @Override
271     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
272         if (msg instanceof ByteBuf) {
273             selfFiredChannelRead = true;
274             CodecOutputList out = CodecOutputList.newInstance();
275             try {
276                 first = cumulation == null;
277                 cumulation = cumulator.cumulate(ctx.alloc(),
278                         first ? Unpooled.EMPTY_BUFFER : cumulation, (ByteBuf) msg);
279                 callDecode(ctx, cumulation, out);
280             } catch (DecoderException e) {
281                 throw e;
282             } catch (Exception e) {
283                 throw new DecoderException(e);
284             } finally {
285                 try {
286                     if (cumulation != null && !cumulation.isReadable()) {
287                         numReads = 0;
288                         cumulation.release();
289                         cumulation = null;
290                     } else if (++numReads >= discardAfterReads) {
291                         // We did enough reads already try to discard some bytes, so we not risk to see a OOME.
292                         // See https://github.com/netty/netty/issues/4275
293                         numReads = 0;
294                         discardSomeReadBytes();
295                     }
296 
297                     int size = out.size();
298                     firedChannelRead |= out.insertSinceRecycled();
299                     fireChannelRead(ctx, out, size);
300                 } finally {
301                     out.recycle();
302                 }
303             }
304         } else {
305             ctx.fireChannelRead(msg);
306         }
307     }
308 
309     /**
310      * Get {@code numElements} out of the {@link List} and forward these through the pipeline.
311      */
312     static void fireChannelRead(ChannelHandlerContext ctx, List<Object> msgs, int numElements) {
313         if (msgs instanceof CodecOutputList) {
314             fireChannelRead(ctx, (CodecOutputList) msgs, numElements);
315         } else {
316             for (int i = 0; i < numElements; i++) {
317                 ctx.fireChannelRead(msgs.get(i));
318             }
319         }
320     }
321 
322     /**
323      * Get {@code numElements} out of the {@link CodecOutputList} and forward these through the pipeline.
324      */
325     static void fireChannelRead(ChannelHandlerContext ctx, CodecOutputList msgs, int numElements) {
326         for (int i = 0; i < numElements; i ++) {
327             ctx.fireChannelRead(msgs.getUnsafe(i));
328         }
329     }
330 
331     @Override
332     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
333         numReads = 0;
334         discardSomeReadBytes();
335         if (selfFiredChannelRead && !firedChannelRead && !ctx.channel().config().isAutoRead()) {
336             ctx.read();
337         }
338         firedChannelRead = false;
339         ctx.fireChannelReadComplete();
340     }
341 
342     protected final void discardSomeReadBytes() {
343         if (cumulation != null && !first && cumulation.refCnt() == 1) {
344             // discard some bytes if possible to make more room in the
345             // buffer but only if the refCnt == 1  as otherwise the user may have
346             // used slice().retain() or duplicate().retain().
347             //
348             // See:
349             // - https://github.com/netty/netty/issues/2327
350             // - https://github.com/netty/netty/issues/1764
351             cumulation.discardSomeReadBytes();
352         }
353     }
354 
355     @Override
356     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
357         channelInputClosed(ctx, true);
358     }
359 
360     @Override
361     public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
362         if (evt instanceof ChannelInputShutdownEvent) {
363             // The decodeLast method is invoked when a channelInactive event is encountered.
364             // This method is responsible for ending requests in some situations and must be called
365             // when the input has been shutdown.
366             channelInputClosed(ctx, false);
367         }
368         super.userEventTriggered(ctx, evt);
369     }
370 
371     private void channelInputClosed(ChannelHandlerContext ctx, boolean callChannelInactive) {
372         CodecOutputList out = CodecOutputList.newInstance();
373         try {
374             channelInputClosed(ctx, out);
375         } catch (DecoderException e) {
376             throw e;
377         } catch (Exception e) {
378             throw new DecoderException(e);
379         } finally {
380             try {
381                 if (cumulation != null) {
382                     cumulation.release();
383                     cumulation = null;
384                 }
385                 int size = out.size();
386                 fireChannelRead(ctx, out, size);
387                 if (size > 0) {
388                     // Something was read, call fireChannelReadComplete()
389                     ctx.fireChannelReadComplete();
390                 }
391                 if (callChannelInactive) {
392                     ctx.fireChannelInactive();
393                 }
394             } finally {
395                 // Recycle in all cases
396                 out.recycle();
397             }
398         }
399     }
400 
401     /**
402      * Called when the input of the channel was closed which may be because it changed to inactive or because of
403      * {@link ChannelInputShutdownEvent}.
404      */
405     void channelInputClosed(ChannelHandlerContext ctx, List<Object> out) throws Exception {
406         if (cumulation != null) {
407             callDecode(ctx, cumulation, out);
408             // If callDecode(...) removed the handle from the pipeline we should not call decodeLast(...) as this would
409             // be unexpected.
410             if (!ctx.isRemoved()) {
411                 // Use Unpooled.EMPTY_BUFFER if cumulation become null after calling callDecode(...).
412                 // See https://github.com/netty/netty/issues/10802.
413                 ByteBuf buffer = cumulation == null ? Unpooled.EMPTY_BUFFER : cumulation;
414                 decodeLast(ctx, buffer, out);
415             }
416         } else {
417             decodeLast(ctx, Unpooled.EMPTY_BUFFER, out);
418         }
419     }
420 
421     /**
422      * Called once data should be decoded from the given {@link ByteBuf}. This method will call
423      * {@link #decode(ChannelHandlerContext, ByteBuf, List)} as long as decoding should take place.
424      *
425      * @param ctx           the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
426      * @param in            the {@link ByteBuf} from which to read data
427      * @param out           the {@link List} to which decoded messages should be added
428      */
429     protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
430         try {
431             while (in.isReadable()) {
432                 final int outSize = out.size();
433 
434                 if (outSize > 0) {
435                     fireChannelRead(ctx, out, outSize);
436                     out.clear();
437 
438                     // Check if this handler was removed before continuing with decoding.
439                     // If it was removed, it is not safe to continue to operate on the buffer.
440                     //
441                     // See:
442                     // - https://github.com/netty/netty/issues/4635
443                     if (ctx.isRemoved()) {
444                         break;
445                     }
446                 }
447 
448                 int oldInputLength = in.readableBytes();
449                 decodeRemovalReentryProtection(ctx, in, out);
450 
451                 // Check if this handler was removed before continuing the loop.
452                 // If it was removed, it is not safe to continue to operate on the buffer.
453                 //
454                 // See https://github.com/netty/netty/issues/1664
455                 if (ctx.isRemoved()) {
456                     break;
457                 }
458 
459                 if (out.isEmpty()) {
460                     if (oldInputLength == in.readableBytes()) {
461                         break;
462                     } else {
463                         continue;
464                     }
465                 }
466 
467                 if (oldInputLength == in.readableBytes()) {
468                     throw new DecoderException(
469                             StringUtil.simpleClassName(getClass()) +
470                                     ".decode() did not read anything but decoded a message.");
471                 }
472 
473                 if (isSingleDecode()) {
474                     break;
475                 }
476             }
477         } catch (DecoderException e) {
478             throw e;
479         } catch (Exception cause) {
480             throw new DecoderException(cause);
481         }
482     }
483 
484     /**
485      * Decode the from one {@link ByteBuf} to an other. This method will be called till either the input
486      * {@link ByteBuf} has nothing to read when return from this method or till nothing was read from the input
487      * {@link ByteBuf}.
488      *
489      * @param ctx           the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
490      * @param in            the {@link ByteBuf} from which to read data
491      * @param out           the {@link List} to which decoded messages should be added
492      * @throws Exception    is thrown if an error occurs
493      */
494     protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;
495 
496     /**
497      * Decode the from one {@link ByteBuf} to an other. This method will be called till either the input
498      * {@link ByteBuf} has nothing to read when return from this method or till nothing was read from the input
499      * {@link ByteBuf}.
500      *
501      * @param ctx           the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
502      * @param in            the {@link ByteBuf} from which to read data
503      * @param out           the {@link List} to which decoded messages should be added
504      * @throws Exception    is thrown if an error occurs
505      */
506     final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
507             throws Exception {
508         decodeState = STATE_CALLING_CHILD_DECODE;
509         try {
510             decode(ctx, in, out);
511         } finally {
512             boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING;
513             decodeState = STATE_INIT;
514             if (removePending) {
515                 fireChannelRead(ctx, out, out.size());
516                 out.clear();
517                 handlerRemoved(ctx);
518             }
519         }
520     }
521 
522     /**
523      * Is called one last time when the {@link ChannelHandlerContext} goes in-active. Which means the
524      * {@link #channelInactive(ChannelHandlerContext)} was triggered.
525      *
526      * By default, this will just call {@link #decode(ChannelHandlerContext, ByteBuf, List)} but sub-classes may
527      * override this for some special cleanup operation.
528      */
529     protected void decodeLast(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
530         if (in.isReadable()) {
531             // Only call decode() if there is something left in the buffer to decode.
532             // See https://github.com/netty/netty/issues/4386
533             decodeRemovalReentryProtection(ctx, in, out);
534         }
535     }
536 
537     static ByteBuf expandCumulation(ByteBufAllocator alloc, ByteBuf oldCumulation, ByteBuf in) {
538         int oldBytes = oldCumulation.readableBytes();
539         int newBytes = in.readableBytes();
540         int totalBytes = oldBytes + newBytes;
541         ByteBuf newCumulation = alloc.buffer(alloc.calculateNewCapacity(totalBytes, MAX_VALUE));
542         ByteBuf toRelease = newCumulation;
543         try {
544             // This avoids redundant checks and stack depth compared to calling writeBytes(...)
545             newCumulation.setBytes(0, oldCumulation, oldCumulation.readerIndex(), oldBytes)
546                 .setBytes(oldBytes, in, in.readerIndex(), newBytes)
547                 .writerIndex(totalBytes);
548             in.readerIndex(in.writerIndex());
549             toRelease = oldCumulation;
550             return newCumulation;
551         } finally {
552             toRelease.release();
553         }
554     }
555 
556     /**
557      * Cumulate {@link ByteBuf}s.
558      */
559     public interface Cumulator {
560         /**
561          * Cumulate the given {@link ByteBuf}s and return the {@link ByteBuf} that holds the cumulated bytes.
562          * The implementation is responsible to correctly handle the life-cycle of the given {@link ByteBuf}s and so
563          * call {@link ByteBuf#release()} if a {@link ByteBuf} is fully consumed.
564          */
565         ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in);
566     }
567 }