View Javadoc
1   /*
2    * Copyright 2021 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License, version 2.0 (the
5    * "License"); you may not use this file except in compliance with the License. You may obtain a
6    * copy of the License at:
7    *
8    * https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software distributed under the License
11   * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12   * or implied. See the License for the specific language governing permissions and limitations under
13   * the License.
14   */
15  package io.netty5.handler.codec;
16  
17  import io.netty5.buffer.api.Buffer;
18  import io.netty5.buffer.api.BufferAllocator;
19  import io.netty5.buffer.api.CompositeBuffer;
20  import io.netty5.channel.ChannelHandler;
21  import io.netty5.channel.ChannelHandlerAdapter;
22  import io.netty5.channel.ChannelHandlerContext;
23  import io.netty5.channel.ChannelOption;
24  import io.netty5.channel.ChannelPipeline;
25  import io.netty5.channel.ChannelShutdownDirection;
26  import io.netty5.channel.internal.DelegatingChannelHandlerContext;
27  import io.netty5.util.Send;
28  import io.netty5.util.internal.StringUtil;
29  
30  import java.util.Arrays;
31  
32  import static io.netty5.util.internal.MathUtil.safeFindNextPositivePowerOfTwo;
33  import static java.util.Objects.requireNonNull;
34  
35  /**
36   * {@link ChannelHandler} which decodes bytes in a stream-like fashion from one {@link Buffer} to an
37   * other Message type.
38   *
39   * For example here is an implementation which reads all readable bytes from
40   * the input {@link Buffer}, creates a new {@link Buffer} and forward it to the next {@link ChannelHandler}
41   * in the {@link ChannelPipeline}.
42   *
43   * <pre>
44   *     public class SquareDecoder extends {@link ByteToMessageDecoder} {
45   *         {@code @Override}
46   *         public void decode({@link ChannelHandlerContext} ctx, {@link Buffer} in)
47   *                 throws {@link Exception} {
48   *             ctx.fireChannelRead(in.readBytes(in.readableBytes()));
49   *         }
50   *     }
51   * </pre>
52   *
53   * <h3>Frame detection</h3>
54   * <p>
55   * Generally frame detection should be handled earlier in the pipeline by adding a
56   * {@link DelimiterBasedFrameDecoder}, {@link FixedLengthFrameDecoder}, {@link LengthFieldBasedFrameDecoder},
57   * or {@link LineBasedFrameDecoder}.
58   * <p>
59   * If a custom frame decoder is required, then one needs to be careful when implementing
60   * one with {@link ByteToMessageDecoder}. Ensure there are enough bytes in the buffer for a
61   * complete frame by checking {@link Buffer#readableBytes()}. If there are not enough bytes
62   * for a complete frame, return without modifying the reader index to allow more bytes to arrive.
63   * <p>
64   * To check for complete frames without modifying the reader index, use methods like {@link Buffer#getInt(int)}.
65   * One <strong>MUST</strong> use the reader index when using methods like {@link Buffer#getInt(int)}.
66   * For example calling <tt>in.getInt(0)</tt> is assuming the frame starts at the beginning of the buffer, which
67   * is not always the case. Use <tt>in.getInt(in.readerIndex())</tt> instead.
68   * <h3>Pitfalls</h3>
69   * <p>
70   * Be aware that sub-classes of {@link ByteToMessageDecoder} <strong>MUST NOT</strong>
71   * annotated with {@link @Sharable}.
72   */
73  public abstract class ByteToMessageDecoder extends ChannelHandlerAdapter {
74  
75      /**
76       * Cumulate {@link Buffer}s by merge them into one {@link Buffer}'s, using memory copies.
77       */
78      public static final Cumulator MERGE_CUMULATOR = new MergeCumulator();
79  
80      /**
81       * Cumulate {@link Buffer}s by add them to a {@link CompositeBuffer} and so do no memory copy whenever possible.
82       * Be aware that {@link CompositeBuffer} use a more complex indexing implementation so depending on your use-case
83       * and the decoder implementation this may be slower than just use the {@link #MERGE_CUMULATOR}.
84       */
85      public static final Cumulator COMPOSITE_CUMULATOR = new CompositeBufferCumulator();
86  
87      private final int discardAfterReads = 16;
88      private final Cumulator cumulator;
89  
90      private Buffer cumulation;
91      private boolean singleDecode;
92      private boolean first;
93      /**
94       * This flag is used to determine if we need to call {@link ChannelHandlerContext#read()} to consume more data
95       * when {@link ChannelOption#AUTO_READ} is {@code false}.
96       */
97      private boolean firedChannelRead;
98      private int numReads;
99      private ByteToMessageDecoderContext context;
100 
101     protected ByteToMessageDecoder() {
102         this(MERGE_CUMULATOR);
103     }
104 
105     protected ByteToMessageDecoder(Cumulator cumulator) {
106         this.cumulator = requireNonNull(cumulator, "cumulator");
107     }
108 
109     @Override
110     public final boolean isSharable() {
111         // Can't be sharable as we keep state.
112         return false;
113     }
114 
115     /**
116      * If set then only one message is decoded on each {@link #channelRead(ChannelHandlerContext, Object)}
117      * call. This may be useful if you need to do some protocol upgrade and want to make sure nothing is mixed up.
118      *
119      * Default is {@code false} as this has performance impacts.
120      */
121     public void setSingleDecode(boolean singleDecode) {
122         this.singleDecode = singleDecode;
123     }
124 
125     /**
126      * If {@code true} then only one message is decoded on each
127      * {@link #channelRead(ChannelHandlerContext, Object)} call.
128      *
129      * Default is {@code false} as this has performance impacts.
130      */
131     public boolean isSingleDecode() {
132         return singleDecode;
133     }
134 
135     /**
136      * Returns the actual number of readable bytes in the internal cumulative
137      * buffer of this decoder. You usually do not need to rely on this value
138      * to write a decoder. Use it only when you must use it at your own risk.
139      * This method is a shortcut to {@link #internalBuffer() internalBuffer().readableBytes()}.
140      */
141     protected int actualReadableBytes() {
142         return internalBuffer().readableBytes();
143     }
144 
145     /**
146      * Returns the internal cumulative buffer of this decoder, if exists, else {@code null}. You usually
147      * do not need to access the internal buffer directly to write a decoder.
148      * Use it only when you must use it at your own risk.
149      *
150      * @return Internal {@link Buffer} if exists, else {@code null}.
151      */
152     protected Buffer internalBuffer() {
153         return cumulation;
154     }
155 
156     @Override
157     public final void handlerAdded(ChannelHandlerContext ctx) throws Exception {
158         context = new ByteToMessageDecoderContext(ctx);
159         handlerAdded0(context);
160     }
161 
162     protected void handlerAdded0(ChannelHandlerContext ctx) throws Exception {
163     }
164 
165     @Override
166     public final void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
167         Buffer buf = cumulation;
168         if (buf != null) {
169             // Directly set this to null so we are sure we not access it in any other method here anymore.
170             cumulation = null;
171             numReads = 0;
172             int readable = buf.readableBytes();
173             if (readable > 0) {
174                 ctx.fireChannelRead(buf);
175                 ctx.fireChannelReadComplete();
176             } else {
177                 buf.close();
178             }
179         }
180         handlerRemoved0(context);
181     }
182 
183     /**
184      * Gets called after the {@link ByteToMessageDecoder} was removed from the actual context and it doesn't
185      * handle events anymore.
186      */
187     protected void handlerRemoved0(ChannelHandlerContext ctx) throws Exception { }
188 
189     @Override
190     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
191         if (msg instanceof Buffer) {
192             try {
193                 Buffer data = (Buffer) msg;
194                 first = cumulation == null;
195                 if (first) {
196                     cumulation = data;
197                 } else {
198                     cumulation = cumulator.cumulate(ctx.bufferAllocator(), cumulation, data);
199                 }
200                 assert context.delegatingCtx() == ctx || ctx == context;
201 
202                 callDecode(context, cumulation);
203             } catch (DecoderException e) {
204                 throw e;
205             } catch (Exception e) {
206                 throw new DecoderException(e);
207             } finally {
208                 if (cumulation != null && cumulation.readableBytes() == 0) {
209                     numReads = 0;
210                     if (cumulation.isAccessible()) {
211                         cumulation.close();
212                     }
213                     cumulation = null;
214                 } else if (++numReads >= discardAfterReads) {
215                     // We did enough reads already try to discard some bytes so we not risk to see a OOME.
216                     // See https://github.com/netty/netty/issues/4275
217                     numReads = 0;
218                     discardSomeReadBytes();
219                 }
220 
221                 firedChannelRead |= context.fireChannelReadCallCount() > 0;
222                 context.reset();
223             }
224         } else {
225             ctx.fireChannelRead(msg);
226         }
227     }
228 
229     @Override
230     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
231         numReads = 0;
232         discardSomeReadBytes();
233         if (!firedChannelRead && !ctx.channel().getOption(ChannelOption.AUTO_READ)) {
234             ctx.read();
235         }
236         firedChannelRead = false;
237         ctx.fireChannelReadComplete();
238     }
239 
240     protected final void discardSomeReadBytes() {
241         if (cumulation != null && !first) {
242             // discard some bytes if possible to make more room in the buffer.
243             cumulator.discardSomeReadBytes(cumulation);
244         }
245     }
246 
247     @Override
248     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
249         assert context.delegatingCtx() == ctx || ctx == context;
250         channelInputClosed(context, true);
251     }
252 
253     @Override
254     public void channelShutdown(ChannelHandlerContext ctx, ChannelShutdownDirection direction) throws Exception {
255         ctx.fireChannelShutdown(direction);
256         if (direction == ChannelShutdownDirection.Inbound) {
257             // The decodeLast method is invoked when a channelInactive event is encountered.
258             // This method is responsible for ending requests in some situations and must be called
259             // when the input has been shutdown.
260             assert context.delegatingCtx() == ctx || ctx == context;
261             channelInputClosed(context, false);
262         }
263     }
264 
265     private void channelInputClosed(ByteToMessageDecoderContext ctx, boolean callChannelInactive) {
266         try {
267             channelInputClosed(ctx);
268         } catch (DecoderException e) {
269             throw e;
270         } catch (Exception e) {
271             throw new DecoderException(e);
272         } finally {
273             if (cumulation != null) {
274                 cumulation.close();
275                 cumulation = null;
276             }
277             if (ctx.fireChannelReadCallCount() > 0) {
278                 ctx.reset();
279                 // Something was read, call fireChannelReadComplete()
280                 ctx.fireChannelReadComplete();
281             }
282             if (callChannelInactive) {
283                 ctx.fireChannelInactive();
284             }
285         }
286     }
287 
288     /**
289      * Called when the input of the channel was closed which may be because it changed to inactive or because of
290      * shutdown.
291      */
292     void channelInputClosed(ByteToMessageDecoderContext ctx) throws Exception {
293         if (cumulation != null) {
294             callDecode(ctx, cumulation);
295             // If callDecode(...) removed the handle from the pipeline we should not call decodeLast(...) as this would
296             // be unexpected.
297             if (!ctx.isRemoved()) {
298                 // Use Unpooled.EMPTY_BUFFER if cumulation become null after calling callDecode(...).
299                 // See https://github.com/netty/netty/issues/10802.
300                 Buffer buffer = cumulation == null ? ctx.bufferAllocator().allocate(0) : cumulation;
301                 decodeLast(ctx, buffer);
302             }
303         } else {
304             decodeLast(ctx, ctx.bufferAllocator().allocate(0));
305         }
306     }
307 
308     /**
309      * Called once data should be decoded from the given {@link Buffer}. This method will call
310      * {@link #decode(ChannelHandlerContext, Buffer)} as long as decoding should take place.
311      *
312      * @param ctx the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
313      * @param in  the {@link Buffer} from which to read data
314      */
315     void callDecode(ByteToMessageDecoderContext ctx, Buffer in) {
316         try {
317             while (in.readableBytes() > 0 && !ctx.isRemoved()) {
318 
319                 int oldInputLength = in.readableBytes();
320                 int numReadCalled = ctx.fireChannelReadCallCount();
321                 decodeRemovalReentryProtection(ctx, in);
322 
323                 // Check if this handler was removed before continuing the loop.
324                 // If it was removed, it is not safe to continue to operate on the buffer.
325                 //
326                 // See https://github.com/netty/netty/issues/1664
327                 if (ctx.isRemoved()) {
328                     break;
329                 }
330 
331                 if (numReadCalled == ctx.fireChannelReadCallCount()) {
332                     if (oldInputLength == in.readableBytes()) {
333                         break;
334                     } else {
335                         continue;
336                     }
337                 }
338 
339                 if (oldInputLength == in.readableBytes()) {
340                     throw new DecoderException(
341                             StringUtil.simpleClassName(getClass()) +
342                                     ".decode() did not read anything but decoded a message.");
343                 }
344 
345                 if (isSingleDecode()) {
346                     break;
347                 }
348             }
349         } catch (DecoderException e) {
350             throw e;
351         } catch (Exception cause) {
352             throw new DecoderException(cause);
353         }
354     }
355 
356     /**
357      * Decode the from one {@link Buffer} to another. This method will be called till either the input
358      * {@link Buffer} has nothing to read when return from this method or till nothing was read from the input
359      * {@link Buffer}.
360      *
361      * @param ctx the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
362      * @param in  the {@link Buffer} from which to read data
363      * @throws Exception is thrown if an error occurs
364      */
365     protected abstract void decode(ChannelHandlerContext ctx, Buffer in) throws Exception;
366 
367     /**
368      * Decode the from one {@link Buffer} to an other. This method will be called till either the input
369      * {@link Buffer} has nothing to read when return from this method or till nothing was read from the input
370      * {@link Buffer}.
371      *
372      * @param ctx the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
373      * @param in the {@link Buffer} from which to read data
374      * @throws Exception is thrown if an error occurs
375      */
376     final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, Buffer in)
377             throws Exception {
378         decode(ctx, in);
379     }
380 
381     /**
382      * Is called one last time when the {@link ChannelHandlerContext} goes in-active. Which means the
383      * {@link #channelInactive(ChannelHandlerContext)} was triggered.
384      *
385      * By default this will just call {@link #decode(ChannelHandlerContext, Buffer)} but sub-classes may
386      * override this for some special cleanup operation.
387      */
388     protected void decodeLast(ChannelHandlerContext ctx, Buffer in) throws Exception {
389         if (in.readableBytes() > 0) {
390             // Only call decode() if there is something left in the buffer to decode.
391             // See https://github.com/netty/netty/issues/4386
392             decodeRemovalReentryProtection(ctx, in);
393         }
394     }
395 
396     /**
397      * Cumulate {@link Buffer}s.
398      */
399     public interface Cumulator {
400         /**
401          * Cumulate the given {@link Buffer}s and return the {@link Buffer} that holds the cumulated bytes.
402          * The implementation is responsible to correctly handle the life-cycle of the given {@link Buffer}s and so
403          * call {@link Buffer#close()} if a {@link Buffer} is fully consumed.
404          */
405         Buffer cumulate(BufferAllocator alloc, Buffer cumulation, Buffer in);
406 
407         /**
408          * Consume the given buffer and return a new buffer with the same readable data, but where any data before the
409          * read offset may have been removed.
410          * The returned buffer may be the same buffer instance as the buffer passed in.
411          *
412          * @param cumulation The buffer we wish to trim already processed bytes from.
413          * @return A buffer where the bytes before the reader-offset have been removed.
414          */
415         Buffer discardSomeReadBytes(Buffer cumulation);
416     }
417 
418     // Package private so we can also make use of it in ReplayingDecoder.
419     static final class ByteToMessageDecoderContext extends DelegatingChannelHandlerContext {
420         private int fireChannelReadCalled;
421 
422         private ByteToMessageDecoderContext(ChannelHandlerContext ctx) {
423             super(ctx);
424         }
425 
426         void reset() {
427             fireChannelReadCalled = 0;
428         }
429 
430         int fireChannelReadCallCount() {
431             return fireChannelReadCalled;
432         }
433 
434         @Override
435         public ChannelHandlerContext fireChannelRead(Object msg) {
436             fireChannelReadCalled ++;
437             super.fireChannelRead(msg);
438             return this;
439         }
440     }
441 
442     private static final class CompositeBufferCumulator implements Cumulator {
443         @Override
444         public Buffer cumulate(BufferAllocator alloc, Buffer cumulation, Buffer in) {
445             if (cumulation.readableBytes() == 0) {
446                 cumulation.close();
447                 return in;
448             }
449             try (in) {
450                 if (in.readableBytes() == 0) {
451                     return cumulation;
452                 }
453                 if (cumulation.readOnly()) {
454                     Buffer tmp = cumulation.copy();
455                     cumulation.close();
456                     cumulation = tmp;
457                 }
458                 if (CompositeBuffer.isComposite(cumulation)) {
459                     CompositeBuffer composite = (CompositeBuffer) cumulation;
460                     composite.extendWith(prepareInForCompose(in));
461                     return composite;
462                 }
463                 return alloc.compose(Arrays.asList(cumulation.send(), prepareInForCompose(in)));
464             }
465         }
466 
467         private static Send<Buffer> prepareInForCompose(Buffer in) {
468             return in.readOnly() ? in.copy().send() : in.send();
469         }
470 
471         @Override
472         public Buffer discardSomeReadBytes(Buffer cumulation) {
473             // Compact is slow on composite buffers, and we also need to avoid leaving any writable space at the end.
474             // Using readSplit(0), we grab zero readable bytes in the split-off buffer, but all the already-read
475             // bytes get cut off from the cumulation buffer.
476             cumulation.readSplit(0).close();
477             return cumulation;
478         }
479 
480         @Override
481         public String toString() {
482             return "CompositeBufferCumulator";
483         }
484     }
485 
486     private static final class MergeCumulator implements Cumulator {
487         @Override
488         public Buffer cumulate(BufferAllocator alloc, Buffer cumulation, Buffer in) {
489             if (cumulation.readableBytes() == 0) {
490                 // If cumulation is empty and input buffer is contiguous, use it directly
491                 cumulation.close();
492                 return in;
493             }
494             // We must close input Buffer in all cases as otherwise it may produce a leak if writeBytes(...) throw
495             // for whatever close (for example because of OutOfMemoryError)
496             try (in) {
497                 final int required = in.readableBytes();
498                 if (required > cumulation.writableBytes() || cumulation.readOnly()) {
499                     return expandCumulationAndWrite(alloc, cumulation, in);
500                 }
501                 cumulation.writeBytes(in);
502                 return cumulation;
503             }
504         }
505 
506         @Override
507         public Buffer discardSomeReadBytes(Buffer cumulation) {
508             if (cumulation.readerOffset() > cumulation.writableBytes()) {
509                 cumulation.compact();
510             }
511             return cumulation;
512         }
513 
514         private static Buffer expandCumulationAndWrite(BufferAllocator alloc, Buffer oldCumulation, Buffer in) {
515             final int newSize = safeFindNextPositivePowerOfTwo(oldCumulation.readableBytes() + in.readableBytes());
516             Buffer newCumulation = oldCumulation.readOnly() ? alloc.allocate(newSize) :
517                     oldCumulation.ensureWritable(newSize);
518             try {
519                 if (newCumulation != oldCumulation) {
520                     newCumulation.writeBytes(oldCumulation);
521                 }
522                 newCumulation.writeBytes(in);
523                 return newCumulation;
524             } finally {
525                 if (newCumulation != oldCumulation) {
526                     oldCumulation.close();
527                 }
528             }
529         }
530 
531         @Override
532         public String toString() {
533             return "MergeCumulator";
534         }
535     }
536 }