View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.codec;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufAllocator;
20  import io.netty.buffer.CompositeByteBuf;
21  import io.netty.buffer.Unpooled;
22  import io.netty.channel.ChannelConfig;
23  import io.netty.channel.ChannelHandlerContext;
24  import io.netty.channel.ChannelInboundHandlerAdapter;
25  import io.netty.channel.socket.ChannelInputShutdownEvent;
26  import io.netty.util.IllegalReferenceCountException;
27  import io.netty.util.internal.ObjectUtil;
28  import io.netty.util.internal.StringUtil;
29  
30  import java.util.List;
31  
32  import static io.netty.util.internal.ObjectUtil.checkPositive;
33  import static java.lang.Integer.MAX_VALUE;
34  
35  /**
36   * {@link ChannelInboundHandlerAdapter} which decodes bytes in a stream-like fashion from one {@link ByteBuf} to an
37   * other Message type.
38   *
39   * For example here is an implementation which reads all readable bytes from
40   * the input {@link ByteBuf} and create a new {@link ByteBuf}.
41   *
42   * <pre>
43   *     public class SquareDecoder extends {@link ByteToMessageDecoder} {
44   *         {@code @Override}
45   *         public void decode({@link ChannelHandlerContext} ctx, {@link ByteBuf} in, List&lt;Object&gt; out)
46   *                 throws {@link Exception} {
47   *             out.add(in.readBytes(in.readableBytes()));
48   *         }
49   *     }
50   * </pre>
51   *
52   * <h3>Frame detection</h3>
53   * <p>
54   * Generally frame detection should be handled earlier in the pipeline by adding a
55   * {@link DelimiterBasedFrameDecoder}, {@link FixedLengthFrameDecoder}, {@link LengthFieldBasedFrameDecoder},
56   * or {@link LineBasedFrameDecoder}.
57   * <p>
58   * If a custom frame decoder is required, then one needs to be careful when implementing
59   * one with {@link ByteToMessageDecoder}. Ensure there are enough bytes in the buffer for a
60   * complete frame by checking {@link ByteBuf#readableBytes()}. If there are not enough bytes
61   * for a complete frame, return without modifying the reader index to allow more bytes to arrive.
62   * <p>
63   * To check for complete frames without modifying the reader index, use methods like {@link ByteBuf#getInt(int)}.
64   * One <strong>MUST</strong> use the reader index when using methods like {@link ByteBuf#getInt(int)}.
65   * For example calling <tt>in.getInt(0)</tt> is assuming the frame starts at the beginning of the buffer, which
66   * is not always the case. Use <tt>in.getInt(in.readerIndex())</tt> instead.
67   * <h3>Pitfalls</h3>
68   * <p>
69   * Be aware that sub-classes of {@link ByteToMessageDecoder} <strong>MUST NOT</strong>
70   * annotated with {@link @Sharable}.
71   * <p>
72   * Some methods such as {@link ByteBuf#readBytes(int)} will cause a memory leak if the returned buffer
73   * is not released or added to the <tt>out</tt> {@link List}. Use derived buffers like {@link ByteBuf#readSlice(int)}
74   * to avoid leaking memory.
75   */
76  public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter {
77  
78      /**
79       * Cumulate {@link ByteBuf}s by merge them into one {@link ByteBuf}'s, using memory copies.
80       */
81      public static final Cumulator MERGE_CUMULATOR = new Cumulator() {
82          @Override
83          public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
84              if (cumulation == in) {
85                  // when the in buffer is the same as the cumulation it is doubly retained, release it once
86                  in.release();
87                  return cumulation;
88              }
89              if (!cumulation.isReadable() && in.isContiguous()) {
90                  // If cumulation is empty and input buffer is contiguous, use it directly
91                  cumulation.release();
92                  return in;
93              }
94              try {
95                  final int required = in.readableBytes();
96                  if (required > cumulation.maxWritableBytes() ||
97                      required > cumulation.maxFastWritableBytes() && cumulation.refCnt() > 1 ||
98                      cumulation.isReadOnly()) {
99                      // Expand cumulation (by replacing it) under the following conditions:
100                     // - cumulation cannot be resized to accommodate the additional data
101                     // - cumulation can be expanded with a reallocation operation to accommodate but the buffer is
102                     //   assumed to be shared (e.g. refCnt() > 1) and the reallocation may not be safe.
103                     return expandCumulation(alloc, cumulation, in);
104                 }
105                 cumulation.writeBytes(in, in.readerIndex(), required);
106                 in.readerIndex(in.writerIndex());
107                 return cumulation;
108             } finally {
109                 // We must release in all cases as otherwise it may produce a leak if writeBytes(...) throw
110                 // for whatever release (for example because of OutOfMemoryError)
111                 in.release();
112             }
113         }
114     };
115 
116     /**
117      * Cumulate {@link ByteBuf}s by add them to a {@link CompositeByteBuf} and so do no memory copy whenever possible.
118      * Be aware that {@link CompositeByteBuf} use a more complex indexing implementation so depending on your use-case
119      * and the decoder implementation this may be slower than just use the {@link #MERGE_CUMULATOR}.
120      */
121     public static final Cumulator COMPOSITE_CUMULATOR = new Cumulator() {
122         @Override
123         public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
124             if (cumulation == in) {
125                 // when the in buffer is the same as the cumulation it is doubly retained, release it once
126                 in.release();
127                 return cumulation;
128             }
129             if (!cumulation.isReadable()) {
130                 cumulation.release();
131                 return in;
132             }
133             CompositeByteBuf composite = null;
134             try {
135                 if (cumulation instanceof CompositeByteBuf && cumulation.refCnt() == 1) {
136                     composite = (CompositeByteBuf) cumulation;
137                     // Writer index must equal capacity if we are going to "write"
138                     // new components to the end
139                     if (composite.writerIndex() != composite.capacity()) {
140                         composite.capacity(composite.writerIndex());
141                     }
142                 } else {
143                     composite = alloc.compositeBuffer(Integer.MAX_VALUE).addFlattenedComponents(true, cumulation);
144                 }
145                 composite.addFlattenedComponents(true, in);
146                 in = null;
147                 return composite;
148             } finally {
149                 if (in != null) {
150                     // We must release if the ownership was not transferred as otherwise it may produce a leak
151                     in.release();
152                     // Also release any new buffer allocated if we're not returning it
153                     if (composite != null && composite != cumulation) {
154                         composite.release();
155                     }
156                 }
157             }
158         }
159     };
160 
161     private static final byte STATE_INIT = 0;
162     private static final byte STATE_CALLING_CHILD_DECODE = 1;
163     private static final byte STATE_HANDLER_REMOVED_PENDING = 2;
164 
165     ByteBuf cumulation;
166     private Cumulator cumulator = MERGE_CUMULATOR;
167     private boolean singleDecode;
168     private boolean first;
169 
170     /**
171      * This flag is used to determine if we need to call {@link ChannelHandlerContext#read()} to consume more data
172      * when {@link ChannelConfig#isAutoRead()} is {@code false}.
173      */
174     private boolean firedChannelRead;
175 
176     private boolean selfFiredChannelRead;
177 
178     /**
179      * A bitmask where the bits are defined as
180      * <ul>
181      *     <li>{@link #STATE_INIT}</li>
182      *     <li>{@link #STATE_CALLING_CHILD_DECODE}</li>
183      *     <li>{@link #STATE_HANDLER_REMOVED_PENDING}</li>
184      * </ul>
185      */
186     private byte decodeState = STATE_INIT;
187     private int discardAfterReads = 16;
188     private int numReads;
189 
190     protected ByteToMessageDecoder() {
191         ensureNotSharable();
192     }
193 
194     /**
195      * If set then only one message is decoded on each {@link #channelRead(ChannelHandlerContext, Object)}
196      * call. This may be useful if you need to do some protocol upgrade and want to make sure nothing is mixed up.
197      *
198      * Default is {@code false} as this has performance impacts.
199      */
200     public void setSingleDecode(boolean singleDecode) {
201         this.singleDecode = singleDecode;
202     }
203 
204     /**
205      * If {@code true} then only one message is decoded on each
206      * {@link #channelRead(ChannelHandlerContext, Object)} call.
207      *
208      * Default is {@code false} as this has performance impacts.
209      */
210     public boolean isSingleDecode() {
211         return singleDecode;
212     }
213 
214     /**
215      * Set the {@link Cumulator} to use for cumulate the received {@link ByteBuf}s.
216      */
217     public void setCumulator(Cumulator cumulator) {
218         this.cumulator = ObjectUtil.checkNotNull(cumulator, "cumulator");
219     }
220 
221     /**
222      * Set the number of reads after which {@link ByteBuf#discardSomeReadBytes()} are called and so free up memory.
223      * The default is {@code 16}.
224      */
225     public void setDiscardAfterReads(int discardAfterReads) {
226         checkPositive(discardAfterReads, "discardAfterReads");
227         this.discardAfterReads = discardAfterReads;
228     }
229 
230     /**
231      * Returns the actual number of readable bytes in the internal cumulative
232      * buffer of this decoder. You usually do not need to rely on this value
233      * to write a decoder. Use it only when you must use it at your own risk.
234      * This method is a shortcut to {@link #internalBuffer() internalBuffer().readableBytes()}.
235      */
236     protected int actualReadableBytes() {
237         return internalBuffer().readableBytes();
238     }
239 
240     /**
241      * Returns the internal cumulative buffer of this decoder. You usually
242      * do not need to access the internal buffer directly to write a decoder.
243      * Use it only when you must use it at your own risk.
244      */
245     protected ByteBuf internalBuffer() {
246         if (cumulation != null) {
247             return cumulation;
248         } else {
249             return Unpooled.EMPTY_BUFFER;
250         }
251     }
252 
253     @Override
254     public final void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
255         if (decodeState == STATE_CALLING_CHILD_DECODE) {
256             decodeState = STATE_HANDLER_REMOVED_PENDING;
257             return;
258         }
259         ByteBuf buf = cumulation;
260         if (buf != null) {
261             // Directly set this to null, so we are sure we not access it in any other method here anymore.
262             cumulation = null;
263             numReads = 0;
264             int readable = buf.readableBytes();
265             if (readable > 0) {
266                 ctx.fireChannelRead(buf);
267                 ctx.fireChannelReadComplete();
268             } else {
269                 buf.release();
270             }
271         }
272         handlerRemoved0(ctx);
273     }
274 
275     /**
276      * Gets called after the {@link ByteToMessageDecoder} was removed from the actual context and it doesn't handle
277      * events anymore.
278      */
279     protected void handlerRemoved0(ChannelHandlerContext ctx) throws Exception { }
280 
281     @Override
282     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
283         if (msg instanceof ByteBuf) {
284             selfFiredChannelRead = true;
285             CodecOutputList out = CodecOutputList.newInstance();
286             try {
287                 first = cumulation == null;
288                 cumulation = cumulator.cumulate(ctx.alloc(),
289                         first ? Unpooled.EMPTY_BUFFER : cumulation, (ByteBuf) msg);
290                 callDecode(ctx, cumulation, out);
291             } catch (DecoderException e) {
292                 throw e;
293             } catch (Exception e) {
294                 throw new DecoderException(e);
295             } finally {
296                 try {
297                     if (cumulation != null && !cumulation.isReadable()) {
298                         numReads = 0;
299                         try {
300                             cumulation.release();
301                         } catch (IllegalReferenceCountException e) {
302                             //noinspection ThrowFromFinallyBlock
303                             throw new IllegalReferenceCountException(
304                                     getClass().getSimpleName() + "#decode() might have released its input buffer, " +
305                                             "or passed it down the pipeline without a retain() call, " +
306                                             "which is not allowed.", e);
307                         }
308                         cumulation = null;
309                     } else if (++numReads >= discardAfterReads) {
310                         // We did enough reads already try to discard some bytes, so we not risk to see a OOME.
311                         // See https://github.com/netty/netty/issues/4275
312                         numReads = 0;
313                         discardSomeReadBytes();
314                     }
315 
316                     int size = out.size();
317                     firedChannelRead |= out.insertSinceRecycled();
318                     fireChannelRead(ctx, out, size);
319                 } finally {
320                     out.recycle();
321                 }
322             }
323         } else {
324             ctx.fireChannelRead(msg);
325         }
326     }
327 
328     /**
329      * Get {@code numElements} out of the {@link List} and forward these through the pipeline.
330      */
331     static void fireChannelRead(ChannelHandlerContext ctx, List<Object> msgs, int numElements) {
332         if (msgs instanceof CodecOutputList) {
333             fireChannelRead(ctx, (CodecOutputList) msgs, numElements);
334         } else {
335             for (int i = 0; i < numElements; i++) {
336                 ctx.fireChannelRead(msgs.get(i));
337             }
338         }
339     }
340 
341     /**
342      * Get {@code numElements} out of the {@link CodecOutputList} and forward these through the pipeline.
343      */
344     static void fireChannelRead(ChannelHandlerContext ctx, CodecOutputList msgs, int numElements) {
345         for (int i = 0; i < numElements; i ++) {
346             ctx.fireChannelRead(msgs.getUnsafe(i));
347         }
348     }
349 
350     @Override
351     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
352         numReads = 0;
353         discardSomeReadBytes();
354         if (selfFiredChannelRead && !firedChannelRead && !ctx.channel().config().isAutoRead()) {
355             ctx.read();
356         }
357         firedChannelRead = false;
358         selfFiredChannelRead = false;
359         ctx.fireChannelReadComplete();
360     }
361 
362     protected final void discardSomeReadBytes() {
363         if (cumulation != null && !first && cumulation.refCnt() == 1) {
364             // discard some bytes if possible to make more room in the
365             // buffer but only if the refCnt == 1  as otherwise the user may have
366             // used slice().retain() or duplicate().retain().
367             //
368             // See:
369             // - https://github.com/netty/netty/issues/2327
370             // - https://github.com/netty/netty/issues/1764
371             cumulation.discardSomeReadBytes();
372         }
373     }
374 
375     @Override
376     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
377         channelInputClosed(ctx, true);
378     }
379 
380     @Override
381     public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
382         if (evt instanceof ChannelInputShutdownEvent) {
383             // The decodeLast method is invoked when a channelInactive event is encountered.
384             // This method is responsible for ending requests in some situations and must be called
385             // when the input has been shutdown.
386             channelInputClosed(ctx, false);
387         }
388         super.userEventTriggered(ctx, evt);
389     }
390 
391     private void channelInputClosed(ChannelHandlerContext ctx, boolean callChannelInactive) {
392         CodecOutputList out = CodecOutputList.newInstance();
393         try {
394             channelInputClosed(ctx, out);
395         } catch (DecoderException e) {
396             throw e;
397         } catch (Exception e) {
398             throw new DecoderException(e);
399         } finally {
400             try {
401                 if (cumulation != null) {
402                     cumulation.release();
403                     cumulation = null;
404                 }
405                 int size = out.size();
406                 fireChannelRead(ctx, out, size);
407                 if (size > 0) {
408                     // Something was read, call fireChannelReadComplete()
409                     ctx.fireChannelReadComplete();
410                 }
411                 if (callChannelInactive) {
412                     ctx.fireChannelInactive();
413                 }
414             } finally {
415                 // Recycle in all cases
416                 out.recycle();
417             }
418         }
419     }
420 
421     /**
422      * Called when the input of the channel was closed which may be because it changed to inactive or because of
423      * {@link ChannelInputShutdownEvent}.
424      */
425     void channelInputClosed(ChannelHandlerContext ctx, List<Object> out) throws Exception {
426         if (cumulation != null) {
427             callDecode(ctx, cumulation, out);
428             // If callDecode(...) removed the handle from the pipeline we should not call decodeLast(...) as this would
429             // be unexpected.
430             if (!ctx.isRemoved()) {
431                 // Use Unpooled.EMPTY_BUFFER if cumulation become null after calling callDecode(...).
432                 // See https://github.com/netty/netty/issues/10802.
433                 ByteBuf buffer = cumulation == null ? Unpooled.EMPTY_BUFFER : cumulation;
434                 decodeLast(ctx, buffer, out);
435             }
436         } else {
437             decodeLast(ctx, Unpooled.EMPTY_BUFFER, out);
438         }
439     }
440 
441     /**
442      * Called once data should be decoded from the given {@link ByteBuf}. This method will call
443      * {@link #decode(ChannelHandlerContext, ByteBuf, List)} as long as decoding should take place.
444      *
445      * @param ctx           the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
446      * @param in            the {@link ByteBuf} from which to read data
447      * @param out           the {@link List} to which decoded messages should be added
448      */
449     protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
450         try {
451             while (in.isReadable()) {
452                 final int outSize = out.size();
453 
454                 if (outSize > 0) {
455                     fireChannelRead(ctx, out, outSize);
456                     out.clear();
457 
458                     // Check if this handler was removed before continuing with decoding.
459                     // If it was removed, it is not safe to continue to operate on the buffer.
460                     //
461                     // See:
462                     // - https://github.com/netty/netty/issues/4635
463                     if (ctx.isRemoved()) {
464                         break;
465                     }
466                 }
467 
468                 int oldInputLength = in.readableBytes();
469                 decodeRemovalReentryProtection(ctx, in, out);
470 
471                 // Check if this handler was removed before continuing the loop.
472                 // If it was removed, it is not safe to continue to operate on the buffer.
473                 //
474                 // See https://github.com/netty/netty/issues/1664
475                 if (ctx.isRemoved()) {
476                     break;
477                 }
478 
479                 if (out.isEmpty()) {
480                     if (oldInputLength == in.readableBytes()) {
481                         break;
482                     } else {
483                         continue;
484                     }
485                 }
486 
487                 if (oldInputLength == in.readableBytes()) {
488                     throw new DecoderException(
489                             StringUtil.simpleClassName(getClass()) +
490                                     ".decode() did not read anything but decoded a message.");
491                 }
492 
493                 if (isSingleDecode()) {
494                     break;
495                 }
496             }
497         } catch (DecoderException e) {
498             throw e;
499         } catch (Exception cause) {
500             throw new DecoderException(cause);
501         }
502     }
503 
504     /**
505      * Decode the from one {@link ByteBuf} to an other. This method will be called till either the input
506      * {@link ByteBuf} has nothing to read when return from this method or till nothing was read from the input
507      * {@link ByteBuf}.
508      *
509      * @param ctx           the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
510      * @param in            the {@link ByteBuf} from which to read data
511      * @param out           the {@link List} to which decoded messages should be added
512      * @throws Exception    is thrown if an error occurs
513      */
514     protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;
515 
516     /**
517      * Decode the from one {@link ByteBuf} to an other. This method will be called till either the input
518      * {@link ByteBuf} has nothing to read when return from this method or till nothing was read from the input
519      * {@link ByteBuf}.
520      *
521      * @param ctx           the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
522      * @param in            the {@link ByteBuf} from which to read data
523      * @param out           the {@link List} to which decoded messages should be added
524      * @throws Exception    is thrown if an error occurs
525      */
526     final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
527             throws Exception {
528         decodeState = STATE_CALLING_CHILD_DECODE;
529         try {
530             decode(ctx, in, out);
531         } finally {
532             boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING;
533             decodeState = STATE_INIT;
534             if (removePending) {
535                 fireChannelRead(ctx, out, out.size());
536                 out.clear();
537                 handlerRemoved(ctx);
538             }
539         }
540     }
541 
542     /**
543      * Is called one last time when the {@link ChannelHandlerContext} goes in-active. Which means the
544      * {@link #channelInactive(ChannelHandlerContext)} was triggered.
545      *
546      * By default, this will just call {@link #decode(ChannelHandlerContext, ByteBuf, List)} but sub-classes may
547      * override this for some special cleanup operation.
548      */
549     protected void decodeLast(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
550         if (in.isReadable()) {
551             // Only call decode() if there is something left in the buffer to decode.
552             // See https://github.com/netty/netty/issues/4386
553             decodeRemovalReentryProtection(ctx, in, out);
554         }
555     }
556 
557     static ByteBuf expandCumulation(ByteBufAllocator alloc, ByteBuf oldCumulation, ByteBuf in) {
558         int oldBytes = oldCumulation.readableBytes();
559         int newBytes = in.readableBytes();
560         int totalBytes = oldBytes + newBytes;
561         ByteBuf newCumulation = alloc.buffer(alloc.calculateNewCapacity(totalBytes, MAX_VALUE));
562         ByteBuf toRelease = newCumulation;
563         try {
564             // This avoids redundant checks and stack depth compared to calling writeBytes(...)
565             newCumulation.setBytes(0, oldCumulation, oldCumulation.readerIndex(), oldBytes)
566                 .setBytes(oldBytes, in, in.readerIndex(), newBytes)
567                 .writerIndex(totalBytes);
568             in.readerIndex(in.writerIndex());
569             toRelease = oldCumulation;
570             return newCumulation;
571         } finally {
572             toRelease.release();
573         }
574     }
575 
576     /**
577      * Cumulate {@link ByteBuf}s.
578      */
579     public interface Cumulator {
580         /**
581          * Cumulate the given {@link ByteBuf}s and return the {@link ByteBuf} that holds the cumulated bytes.
582          * The implementation is responsible to correctly handle the life-cycle of the given {@link ByteBuf}s and so
583          * call {@link ByteBuf#release()} if a {@link ByteBuf} is fully consumed.
584          */
585         ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in);
586     }
587 }