View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.codec;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufAllocator;
20  import io.netty.buffer.CompositeByteBuf;
21  import io.netty.buffer.Unpooled;
22  import io.netty.channel.ChannelConfig;
23  import io.netty.channel.ChannelHandlerContext;
24  import io.netty.channel.ChannelInboundHandlerAdapter;
25  import io.netty.channel.socket.ChannelInputShutdownEvent;
26  import io.netty.util.IllegalReferenceCountException;
27  import io.netty.util.internal.ObjectUtil;
28  import io.netty.util.internal.StringUtil;
29  
30  import java.util.ArrayDeque;
31  import java.util.List;
32  import java.util.Queue;
33  
34  import static io.netty.buffer.Unpooled.EMPTY_BUFFER;
35  import static io.netty.util.internal.ObjectUtil.checkPositive;
36  
37  /**
38   * {@link ChannelInboundHandlerAdapter} which decodes bytes in a stream-like fashion from one {@link ByteBuf} to
39   * another Message type.
40   * <p>
41   * For example here is an implementation which reads all readable bytes from
42   * the input {@link ByteBuf} and create a new {@link ByteBuf}.
43   *
44   * <pre>
45   *     public class SquareDecoder extends {@link ByteToMessageDecoder} {
46   *         {@code @Override}
47   *         public void decode({@link ChannelHandlerContext} ctx, {@link ByteBuf} in, List&lt;Object&gt; out)
48   *                 throws {@link Exception} {
49   *             out.add(in.readBytes(in.readableBytes()));
50   *         }
51   *     }
52   * </pre>
53   *
54   * <h3>Frame detection</h3>
55   * <p>
56   * Generally frame detection should be handled earlier in the pipeline by adding a
57   * {@link DelimiterBasedFrameDecoder}, {@link FixedLengthFrameDecoder}, {@link LengthFieldBasedFrameDecoder},
58   * or {@link LineBasedFrameDecoder}.
59   * <p>
60   * If a custom frame decoder is required, then one needs to be careful when implementing
61   * one with {@link ByteToMessageDecoder}. Ensure there are enough bytes in the buffer for a
62   * complete frame by checking {@link ByteBuf#readableBytes()}. If there are not enough bytes
63   * for a complete frame, return without modifying the reader index to allow more bytes to arrive.
64   * <p>
65   * To check for complete frames without modifying the reader index, use methods like {@link ByteBuf#getInt(int)}.
66   * One <strong>MUST</strong> use the reader index when using methods like {@link ByteBuf#getInt(int)}.
67   * For example calling <tt>in.getInt(0)</tt> is assuming the frame starts at the beginning of the buffer, which
68   * is not always the case. Use <tt>in.getInt(in.readerIndex())</tt> instead.
69   * <h3>Pitfalls</h3>
70   * <p>
71   * Be aware that subclasses of {@link ByteToMessageDecoder} <strong>MUST NOT</strong>
72   * annotated with {@link @Sharable}.
73   * <p>
74   * Some methods such as {@link ByteBuf#readBytes(int)} will cause a memory leak if the returned buffer
75   * is not released or added to the <tt>out</tt> {@link List}. Use derived buffers like {@link ByteBuf#readSlice(int)}
76   * to avoid leaking memory.
77   */
78  public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter {
79  
80      /**
81       * Cumulate {@link ByteBuf}s by merge them into one {@link ByteBuf}'s, using memory copies.
82       */
83      public static final Cumulator MERGE_CUMULATOR = new Cumulator() {
84          @Override
85          public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
86              if (cumulation == in) {
87                  // when the in buffer is the same as the cumulation it is doubly retained, release it once
88                  in.release();
89                  return cumulation;
90              }
91              if (!cumulation.isReadable() && in.isContiguous()) {
92                  // If cumulation is empty and input buffer is contiguous, use it directly
93                  cumulation.release();
94                  return in;
95              }
96              try {
97                  final int required = in.readableBytes();
98                  if (required > cumulation.maxWritableBytes() ||
99                      required > cumulation.maxFastWritableBytes() && cumulation.refCnt() > 1 ||
100                     cumulation.isReadOnly()) {
101                     // Expand cumulation (by replacing it) under the following conditions:
102                     // - cumulation cannot be resized to accommodate the additional data
103                     // - cumulation can be expanded with a reallocation operation to accommodate but the buffer is
104                     //   assumed to be shared (e.g. refCnt() > 1) and the reallocation may not be safe.
105                     return expandCumulation(alloc, cumulation, in);
106                 }
107                 cumulation.writeBytes(in, in.readerIndex(), required);
108                 in.readerIndex(in.writerIndex());
109                 return cumulation;
110             } finally {
111                 // We must release in all cases as otherwise it may produce a leak if writeBytes(...) throw
112                 // for whatever release (for example because of OutOfMemoryError)
113                 in.release();
114             }
115         }
116     };
117 
118     /**
119      * Cumulate {@link ByteBuf}s by add them to a {@link CompositeByteBuf} and so do no memory copy whenever possible.
120      * Be aware that {@link CompositeByteBuf} use a more complex indexing implementation so depending on your use-case
121      * and the decoder implementation this may be slower than just use the {@link #MERGE_CUMULATOR}.
122      */
123     public static final Cumulator COMPOSITE_CUMULATOR = new Cumulator() {
124         @Override
125         public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
126             if (cumulation == in) {
127                 // when the in buffer is the same as the cumulation it is doubly retained, release it once
128                 in.release();
129                 return cumulation;
130             }
131             if (!cumulation.isReadable()) {
132                 cumulation.release();
133                 return in;
134             }
135             CompositeByteBuf composite = null;
136             try {
137                 if (cumulation instanceof CompositeByteBuf && cumulation.refCnt() == 1) {
138                     composite = (CompositeByteBuf) cumulation;
139                     // Writer index must equal capacity if we are going to "write"
140                     // new components to the end
141                     if (composite.writerIndex() != composite.capacity()) {
142                         composite.capacity(composite.writerIndex());
143                     }
144                 } else {
145                     composite = alloc.compositeBuffer(Integer.MAX_VALUE).addFlattenedComponents(true, cumulation);
146                 }
147                 composite.addFlattenedComponents(true, in);
148                 in = null;
149                 return composite;
150             } finally {
151                 if (in != null) {
152                     // We must release if the ownership was not transferred as otherwise it may produce a leak
153                     in.release();
154                     // Also release any new buffer allocated if we're not returning it
155                     if (composite != null && composite != cumulation) {
156                         composite.release();
157                     }
158                 }
159             }
160         }
161     };
162 
163     private static final byte STATE_INIT = 0;
164     private static final byte STATE_CALLING_CHILD_DECODE = 1;
165     private static final byte STATE_HANDLER_REMOVED_PENDING = 2;
166 
167     // Used to guard the inputs for reentrant channelRead calls
168     private Queue<Object> inputMessages;
169     ByteBuf cumulation;
170     private Cumulator cumulator = MERGE_CUMULATOR;
171     private boolean singleDecode;
172     private boolean first;
173 
174     /**
175      * This flag is used to determine if we need to call {@link ChannelHandlerContext#read()} to consume more data
176      * when {@link ChannelConfig#isAutoRead()} is {@code false}.
177      */
178     private boolean firedChannelRead;
179 
180     private boolean selfFiredChannelRead;
181 
182     /**
183      * A bitmask where the bits are defined as
184      * <ul>
185      *     <li>{@link #STATE_INIT}</li>
186      *     <li>{@link #STATE_CALLING_CHILD_DECODE}</li>
187      *     <li>{@link #STATE_HANDLER_REMOVED_PENDING}</li>
188      * </ul>
189      */
190     private byte decodeState = STATE_INIT;
191     private int discardAfterReads = 16;
192     private int numReads;
193 
194     protected ByteToMessageDecoder() {
195         ensureNotSharable();
196     }
197 
198     /**
199      * If set then only one message is decoded on each {@link #channelRead(ChannelHandlerContext, Object)}
200      * call. This may be useful if you need to do some protocol upgrade and want to make sure nothing is mixed up.
201      *
202      * Default is {@code false} as this has performance impacts.
203      */
204     public void setSingleDecode(boolean singleDecode) {
205         this.singleDecode = singleDecode;
206     }
207 
208     /**
209      * If {@code true} then only one message is decoded on each
210      * {@link #channelRead(ChannelHandlerContext, Object)} call.
211      *
212      * Default is {@code false} as this has performance impacts.
213      */
214     public boolean isSingleDecode() {
215         return singleDecode;
216     }
217 
218     /**
219      * Set the {@link Cumulator} to use for cumulate the received {@link ByteBuf}s.
220      */
221     public void setCumulator(Cumulator cumulator) {
222         this.cumulator = ObjectUtil.checkNotNull(cumulator, "cumulator");
223     }
224 
225     /**
226      * Set the number of reads after which {@link ByteBuf#discardSomeReadBytes()} are called and so free up memory.
227      * The default is {@code 16}.
228      */
229     public void setDiscardAfterReads(int discardAfterReads) {
230         checkPositive(discardAfterReads, "discardAfterReads");
231         this.discardAfterReads = discardAfterReads;
232     }
233 
234     /**
235      * Returns the actual number of readable bytes in the internal cumulative
236      * buffer of this decoder. You usually do not need to rely on this value
237      * to write a decoder. Use it only when you must use it at your own risk.
238      * This method is a shortcut to {@link #internalBuffer() internalBuffer().readableBytes()}.
239      */
240     protected int actualReadableBytes() {
241         return internalBuffer().readableBytes();
242     }
243 
244     /**
245      * Returns the internal cumulative buffer of this decoder. You usually
246      * do not need to access the internal buffer directly to write a decoder.
247      * Use it only when you must use it at your own risk.
248      */
249     protected ByteBuf internalBuffer() {
250         if (cumulation != null) {
251             return cumulation;
252         } else {
253             return Unpooled.EMPTY_BUFFER;
254         }
255     }
256 
257     @Override
258     public final void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
259         if (decodeState == STATE_CALLING_CHILD_DECODE) {
260             decodeState = STATE_HANDLER_REMOVED_PENDING;
261             return;
262         }
263         ByteBuf buf = cumulation;
264         if (buf != null) {
265             // Directly set this to null, so we are sure we not access it in any other method here anymore.
266             cumulation = null;
267             numReads = 0;
268             int readable = buf.readableBytes();
269             if (readable > 0) {
270                 ctx.fireChannelRead(buf);
271                 ctx.fireChannelReadComplete();
272             } else {
273                 buf.release();
274             }
275         }
276         handlerRemoved0(ctx);
277     }
278 
279     /**
280      * Gets called after the {@link ByteToMessageDecoder} was removed from the actual context and it doesn't handle
281      * events anymore.
282      */
283     protected void handlerRemoved0(ChannelHandlerContext ctx) throws Exception { }
284 
285     @Override
286     public void channelRead(ChannelHandlerContext ctx, Object input) throws Exception {
287         if (decodeState == STATE_INIT) {
288             do {
289                 if (input instanceof ByteBuf) {
290                     selfFiredChannelRead = true;
291                     CodecOutputList out = CodecOutputList.newInstance();
292                     try {
293                         first = cumulation == null;
294                         cumulation = cumulator.cumulate(ctx.alloc(),
295                                 first ? EMPTY_BUFFER : cumulation, (ByteBuf) input);
296                         callDecode(ctx, cumulation, out);
297                     } catch (DecoderException e) {
298                         throw e;
299                     } catch (Exception e) {
300                         throw new DecoderException(e);
301                     } finally {
302                         try {
303                             if (cumulation != null && !cumulation.isReadable()) {
304                                 numReads = 0;
305                                 try {
306                                     cumulation.release();
307                                 } catch (IllegalReferenceCountException e) {
308                                     //noinspection ThrowFromFinallyBlock
309                                     throw new IllegalReferenceCountException(
310                                             getClass().getSimpleName() +
311                                                     "#decode() might have released its input buffer, " +
312                                                     "or passed it down the pipeline without a retain() call, " +
313                                                     "which is not allowed.", e);
314                                 }
315                                 cumulation = null;
316                             } else if (++numReads >= discardAfterReads) {
317                                 // We did enough reads already try to discard some bytes, so we not risk to see a OOME.
318                                 // See https://github.com/netty/netty/issues/4275
319                                 numReads = 0;
320                                 discardSomeReadBytes();
321                             }
322 
323                             int size = out.size();
324                             firedChannelRead |= out.insertSinceRecycled();
325                             fireChannelRead(ctx, out, size);
326                         } finally {
327                             out.recycle();
328                         }
329                     }
330                 } else {
331                     ctx.fireChannelRead(input);
332                 }
333             } while (inputMessages != null && (input = inputMessages.poll()) != null);
334         } else {
335             // Reentrant call. Bail out here and let original call process our message.
336             if (inputMessages == null) {
337                 inputMessages = new ArrayDeque<>(2);
338             }
339             inputMessages.offer(input);
340         }
341     }
342 
343     /**
344      * Get {@code numElements} out of the {@link List} and forward these through the pipeline.
345      */
346     static void fireChannelRead(ChannelHandlerContext ctx, List<Object> msgs, int numElements) {
347         if (msgs instanceof CodecOutputList) {
348             fireChannelRead(ctx, (CodecOutputList) msgs, numElements);
349         } else {
350             for (int i = 0; i < numElements; i++) {
351                 ctx.fireChannelRead(msgs.get(i));
352             }
353         }
354     }
355 
356     /**
357      * Get {@code numElements} out of the {@link CodecOutputList} and forward these through the pipeline.
358      */
359     static void fireChannelRead(ChannelHandlerContext ctx, CodecOutputList msgs, int numElements) {
360         for (int i = 0; i < numElements; i ++) {
361             ctx.fireChannelRead(msgs.getUnsafe(i));
362         }
363     }
364 
365     @Override
366     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
367         numReads = 0;
368         discardSomeReadBytes();
369         if (selfFiredChannelRead && !firedChannelRead && !ctx.channel().config().isAutoRead()) {
370             ctx.read();
371         }
372         firedChannelRead = false;
373         selfFiredChannelRead = false;
374         ctx.fireChannelReadComplete();
375     }
376 
377     protected final void discardSomeReadBytes() {
378         if (cumulation != null && !first && cumulation.refCnt() == 1) {
379             // discard some bytes if possible to make more room in the
380             // buffer but only if the refCnt == 1  as otherwise the user may have
381             // used slice().retain() or duplicate().retain().
382             //
383             // See:
384             // - https://github.com/netty/netty/issues/2327
385             // - https://github.com/netty/netty/issues/1764
386             cumulation.discardSomeReadBytes();
387         }
388     }
389 
390     @Override
391     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
392         channelInputClosed(ctx, true);
393     }
394 
395     @Override
396     public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
397         if (evt instanceof ChannelInputShutdownEvent) {
398             // The decodeLast method is invoked when a channelInactive event is encountered.
399             // This method is responsible for ending requests in some situations and must be called
400             // when the input has been shutdown.
401             channelInputClosed(ctx, false);
402         }
403         super.userEventTriggered(ctx, evt);
404     }
405 
406     private void channelInputClosed(ChannelHandlerContext ctx, boolean callChannelInactive) {
407         CodecOutputList out = CodecOutputList.newInstance();
408         try {
409             channelInputClosed(ctx, out);
410         } catch (DecoderException e) {
411             throw e;
412         } catch (Exception e) {
413             throw new DecoderException(e);
414         } finally {
415             try {
416                 if (cumulation != null) {
417                     cumulation.release();
418                     cumulation = null;
419                 }
420                 int size = out.size();
421                 fireChannelRead(ctx, out, size);
422                 if (size > 0) {
423                     // Something was read, call fireChannelReadComplete()
424                     ctx.fireChannelReadComplete();
425                 }
426                 if (callChannelInactive) {
427                     ctx.fireChannelInactive();
428                 }
429             } finally {
430                 // Recycle in all cases
431                 out.recycle();
432             }
433         }
434     }
435 
436     /**
437      * Called when the input of the channel was closed which may be because it changed to inactive or because of
438      * {@link ChannelInputShutdownEvent}.
439      */
440     void channelInputClosed(ChannelHandlerContext ctx, List<Object> out) throws Exception {
441         if (cumulation != null) {
442             callDecode(ctx, cumulation, out);
443             // If callDecode(...) removed the handle from the pipeline we should not call decodeLast(...) as this would
444             // be unexpected.
445             if (!ctx.isRemoved()) {
446                 // Use Unpooled.EMPTY_BUFFER if cumulation become null after calling callDecode(...).
447                 // See https://github.com/netty/netty/issues/10802.
448                 ByteBuf buffer = cumulation == null ? Unpooled.EMPTY_BUFFER : cumulation;
449                 decodeLast(ctx, buffer, out);
450             }
451         } else {
452             decodeLast(ctx, Unpooled.EMPTY_BUFFER, out);
453         }
454     }
455 
456     /**
457      * Called once data should be decoded from the given {@link ByteBuf}. This method will call
458      * {@link #decode(ChannelHandlerContext, ByteBuf, List)} as long as decoding should take place.
459      *
460      * @param ctx           the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
461      * @param in            the {@link ByteBuf} from which to read data
462      * @param out           the {@link List} to which decoded messages should be added
463      */
464     protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
465         try {
466             while (in.isReadable()) {
467                 final int outSize = out.size();
468 
469                 if (outSize > 0) {
470                     fireChannelRead(ctx, out, outSize);
471                     out.clear();
472 
473                     // Check if this handler was removed before continuing with decoding.
474                     // If it was removed, it is not safe to continue to operate on the buffer.
475                     //
476                     // See:
477                     // - https://github.com/netty/netty/issues/4635
478                     if (ctx.isRemoved()) {
479                         break;
480                     }
481                 }
482 
483                 int oldInputLength = in.readableBytes();
484                 decodeRemovalReentryProtection(ctx, in, out);
485 
486                 // Check if this handler was removed before continuing the loop.
487                 // If it was removed, it is not safe to continue to operate on the buffer.
488                 //
489                 // See https://github.com/netty/netty/issues/1664
490                 if (ctx.isRemoved()) {
491                     break;
492                 }
493 
494                 if (out.isEmpty()) {
495                     if (oldInputLength == in.readableBytes()) {
496                         break;
497                     } else {
498                         continue;
499                     }
500                 }
501 
502                 if (oldInputLength == in.readableBytes()) {
503                     throw new DecoderException(
504                             StringUtil.simpleClassName(getClass()) +
505                                     ".decode() did not read anything but decoded a message.");
506                 }
507 
508                 if (isSingleDecode()) {
509                     break;
510                 }
511             }
512         } catch (DecoderException e) {
513             throw e;
514         } catch (Exception cause) {
515             throw new DecoderException(cause);
516         }
517     }
518 
519     /**
520      * Decode the from one {@link ByteBuf} to an other. This method will be called till either the input
521      * {@link ByteBuf} has nothing to read when return from this method or till nothing was read from the input
522      * {@link ByteBuf}.
523      *
524      * @param ctx           the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
525      * @param in            the {@link ByteBuf} from which to read data
526      * @param out           the {@link List} to which decoded messages should be added
527      * @throws Exception    is thrown if an error occurs
528      */
529     protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;
530 
531     /**
532      * Decode the from one {@link ByteBuf} to an other. This method will be called till either the input
533      * {@link ByteBuf} has nothing to read when return from this method or till nothing was read from the input
534      * {@link ByteBuf}.
535      *
536      * @param ctx           the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
537      * @param in            the {@link ByteBuf} from which to read data
538      * @param out           the {@link List} to which decoded messages should be added
539      * @throws Exception    is thrown if an error occurs
540      */
541     final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
542             throws Exception {
543         decodeState = STATE_CALLING_CHILD_DECODE;
544         try {
545             decode(ctx, in, out);
546         } finally {
547             if (inputMessages == null || inputMessages.isEmpty()) {
548                 boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING;
549                 decodeState = STATE_INIT;
550                 if (removePending) {
551                     fireChannelRead(ctx, out, out.size());
552                     out.clear();
553                     handlerRemoved(ctx);
554                 }
555             }
556         }
557     }
558 
559     /**
560      * Is called one last time when the {@link ChannelHandlerContext} goes in-active. Which means the
561      * {@link #channelInactive(ChannelHandlerContext)} was triggered.
562      *
563      * By default, this will just call {@link #decode(ChannelHandlerContext, ByteBuf, List)} but sub-classes may
564      * override this for some special cleanup operation.
565      */
566     protected void decodeLast(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
567         if (in.isReadable()) {
568             // Only call decode() if there is something left in the buffer to decode.
569             // See https://github.com/netty/netty/issues/4386
570             decodeRemovalReentryProtection(ctx, in, out);
571         }
572     }
573 
574     static ByteBuf expandCumulation(ByteBufAllocator alloc, ByteBuf oldCumulation, ByteBuf in) {
575         int oldBytes = oldCumulation.readableBytes();
576         int newBytes = in.readableBytes();
577         int totalBytes = oldBytes + newBytes;
578         ByteBuf newCumulation = alloc.buffer(alloc.calculateNewCapacity(totalBytes, Integer.MAX_VALUE));
579         ByteBuf toRelease = newCumulation;
580         try {
581             // This avoids redundant checks and stack depth compared to calling writeBytes(...)
582             newCumulation.setBytes(0, oldCumulation, oldCumulation.readerIndex(), oldBytes)
583                 .setBytes(oldBytes, in, in.readerIndex(), newBytes)
584                 .writerIndex(totalBytes);
585             in.readerIndex(in.writerIndex());
586             toRelease = oldCumulation;
587             return newCumulation;
588         } finally {
589             toRelease.release();
590         }
591     }
592 
593     /**
594      * Cumulate {@link ByteBuf}s.
595      */
596     public interface Cumulator {
597         /**
598          * Cumulate the given {@link ByteBuf}s and return the {@link ByteBuf} that holds the cumulated bytes.
599          * The implementation is responsible to correctly handle the life-cycle of the given {@link ByteBuf}s and so
600          * call {@link ByteBuf#release()} if a {@link ByteBuf} is fully consumed.
601          */
602         ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in);
603     }
604 }