1 /*
2 * Copyright 2021 The Netty Project
3 *
4 * The Netty Project licenses this file to you under the Apache License, version 2.0 (the
5 * "License"); you may not use this file except in compliance with the License. You may obtain a
6 * copy of the License at:
7 *
8 * https://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software distributed under the License
11 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12 * or implied. See the License for the specific language governing permissions and limitations under
13 * the License.
14 */
15 package io.netty5.handler.codec;
16
17 import io.netty5.buffer.api.Buffer;
18 import io.netty5.buffer.api.BufferAllocator;
19 import io.netty5.buffer.api.CompositeBuffer;
20 import io.netty5.channel.ChannelHandler;
21 import io.netty5.channel.ChannelHandlerAdapter;
22 import io.netty5.channel.ChannelHandlerContext;
23 import io.netty5.channel.ChannelOption;
24 import io.netty5.channel.ChannelPipeline;
25 import io.netty5.channel.ChannelShutdownDirection;
26 import io.netty5.channel.internal.DelegatingChannelHandlerContext;
27 import io.netty5.util.Send;
28 import io.netty5.util.internal.StringUtil;
29
30 import java.util.Arrays;
31
32 import static io.netty5.util.internal.MathUtil.safeFindNextPositivePowerOfTwo;
33 import static java.util.Objects.requireNonNull;
34
35 /**
36 * {@link ChannelHandler} which decodes bytes in a stream-like fashion from one {@link Buffer} to an
37 * other Message type.
38 *
39 * For example here is an implementation which reads all readable bytes from
40 * the input {@link Buffer}, creates a new {@link Buffer} and forward it to the next {@link ChannelHandler}
41 * in the {@link ChannelPipeline}.
42 *
43 * <pre>
44 * public class SquareDecoder extends {@link ByteToMessageDecoder} {
45 * {@code @Override}
46 * public void decode({@link ChannelHandlerContext} ctx, {@link Buffer} in)
47 * throws {@link Exception} {
48 * ctx.fireChannelRead(in.readBytes(in.readableBytes()));
49 * }
50 * }
51 * </pre>
52 *
53 * <h3>Frame detection</h3>
54 * <p>
55 * Generally frame detection should be handled earlier in the pipeline by adding a
56 * {@link DelimiterBasedFrameDecoder}, {@link FixedLengthFrameDecoder}, {@link LengthFieldBasedFrameDecoder},
57 * or {@link LineBasedFrameDecoder}.
58 * <p>
59 * If a custom frame decoder is required, then one needs to be careful when implementing
60 * one with {@link ByteToMessageDecoder}. Ensure there are enough bytes in the buffer for a
61 * complete frame by checking {@link Buffer#readableBytes()}. If there are not enough bytes
62 * for a complete frame, return without modifying the reader index to allow more bytes to arrive.
63 * <p>
64 * To check for complete frames without modifying the reader index, use methods like {@link Buffer#getInt(int)}.
65 * One <strong>MUST</strong> use the reader index when using methods like {@link Buffer#getInt(int)}.
66 * For example calling <tt>in.getInt(0)</tt> is assuming the frame starts at the beginning of the buffer, which
67 * is not always the case. Use <tt>in.getInt(in.readerIndex())</tt> instead.
68 * <h3>Pitfalls</h3>
69 * <p>
70 * Be aware that sub-classes of {@link ByteToMessageDecoder} <strong>MUST NOT</strong>
71 * annotated with {@link @Sharable}.
72 */
73 public abstract class ByteToMessageDecoder extends ChannelHandlerAdapter {
74
75 /**
76 * Cumulate {@link Buffer}s by merge them into one {@link Buffer}'s, using memory copies.
77 */
78 public static final Cumulator MERGE_CUMULATOR = new MergeCumulator();
79
80 /**
81 * Cumulate {@link Buffer}s by add them to a {@link CompositeBuffer} and so do no memory copy whenever possible.
82 * Be aware that {@link CompositeBuffer} use a more complex indexing implementation so depending on your use-case
83 * and the decoder implementation this may be slower than just use the {@link #MERGE_CUMULATOR}.
84 */
85 public static final Cumulator COMPOSITE_CUMULATOR = new CompositeBufferCumulator();
86
87 private final int discardAfterReads = 16;
88 private final Cumulator cumulator;
89
90 private Buffer cumulation;
91 private boolean singleDecode;
92 private boolean first;
93 /**
94 * This flag is used to determine if we need to call {@link ChannelHandlerContext#read()} to consume more data
95 * when {@link ChannelOption#AUTO_READ} is {@code false}.
96 */
97 private boolean firedChannelRead;
98 private int numReads;
99 private ByteToMessageDecoderContext context;
100
101 protected ByteToMessageDecoder() {
102 this(MERGE_CUMULATOR);
103 }
104
105 protected ByteToMessageDecoder(Cumulator cumulator) {
106 this.cumulator = requireNonNull(cumulator, "cumulator");
107 }
108
109 @Override
110 public final boolean isSharable() {
111 // Can't be sharable as we keep state.
112 return false;
113 }
114
115 /**
116 * If set then only one message is decoded on each {@link #channelRead(ChannelHandlerContext, Object)}
117 * call. This may be useful if you need to do some protocol upgrade and want to make sure nothing is mixed up.
118 *
119 * Default is {@code false} as this has performance impacts.
120 */
121 public void setSingleDecode(boolean singleDecode) {
122 this.singleDecode = singleDecode;
123 }
124
125 /**
126 * If {@code true} then only one message is decoded on each
127 * {@link #channelRead(ChannelHandlerContext, Object)} call.
128 *
129 * Default is {@code false} as this has performance impacts.
130 */
131 public boolean isSingleDecode() {
132 return singleDecode;
133 }
134
135 /**
136 * Returns the actual number of readable bytes in the internal cumulative
137 * buffer of this decoder. You usually do not need to rely on this value
138 * to write a decoder. Use it only when you must use it at your own risk.
139 * This method is a shortcut to {@link #internalBuffer() internalBuffer().readableBytes()}.
140 */
141 protected int actualReadableBytes() {
142 return internalBuffer().readableBytes();
143 }
144
145 /**
146 * Returns the internal cumulative buffer of this decoder, if exists, else {@code null}. You usually
147 * do not need to access the internal buffer directly to write a decoder.
148 * Use it only when you must use it at your own risk.
149 *
150 * @return Internal {@link Buffer} if exists, else {@code null}.
151 */
152 protected Buffer internalBuffer() {
153 return cumulation;
154 }
155
156 @Override
157 public final void handlerAdded(ChannelHandlerContext ctx) throws Exception {
158 context = new ByteToMessageDecoderContext(ctx);
159 handlerAdded0(context);
160 }
161
162 protected void handlerAdded0(ChannelHandlerContext ctx) throws Exception {
163 }
164
165 @Override
166 public final void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
167 Buffer buf = cumulation;
168 if (buf != null) {
169 // Directly set this to null so we are sure we not access it in any other method here anymore.
170 cumulation = null;
171 numReads = 0;
172 int readable = buf.readableBytes();
173 if (readable > 0) {
174 ctx.fireChannelRead(buf);
175 ctx.fireChannelReadComplete();
176 } else {
177 buf.close();
178 }
179 }
180 handlerRemoved0(context);
181 }
182
183 /**
184 * Gets called after the {@link ByteToMessageDecoder} was removed from the actual context and it doesn't
185 * handle events anymore.
186 */
187 protected void handlerRemoved0(ChannelHandlerContext ctx) throws Exception { }
188
189 @Override
190 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
191 if (msg instanceof Buffer) {
192 try {
193 Buffer data = (Buffer) msg;
194 first = cumulation == null;
195 if (first) {
196 cumulation = data;
197 } else {
198 cumulation = cumulator.cumulate(ctx.bufferAllocator(), cumulation, data);
199 }
200 assert context.delegatingCtx() == ctx || ctx == context;
201
202 callDecode(context, cumulation);
203 } catch (DecoderException e) {
204 throw e;
205 } catch (Exception e) {
206 throw new DecoderException(e);
207 } finally {
208 if (cumulation != null && cumulation.readableBytes() == 0) {
209 numReads = 0;
210 if (cumulation.isAccessible()) {
211 cumulation.close();
212 }
213 cumulation = null;
214 } else if (++numReads >= discardAfterReads) {
215 // We did enough reads already try to discard some bytes so we not risk to see a OOME.
216 // See https://github.com/netty/netty/issues/4275
217 numReads = 0;
218 discardSomeReadBytes();
219 }
220
221 firedChannelRead |= context.fireChannelReadCallCount() > 0;
222 context.reset();
223 }
224 } else {
225 ctx.fireChannelRead(msg);
226 }
227 }
228
229 @Override
230 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
231 numReads = 0;
232 discardSomeReadBytes();
233 if (!firedChannelRead && !ctx.channel().getOption(ChannelOption.AUTO_READ)) {
234 ctx.read();
235 }
236 firedChannelRead = false;
237 ctx.fireChannelReadComplete();
238 }
239
240 protected final void discardSomeReadBytes() {
241 if (cumulation != null && !first) {
242 // discard some bytes if possible to make more room in the buffer.
243 cumulator.discardSomeReadBytes(cumulation);
244 }
245 }
246
247 @Override
248 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
249 assert context.delegatingCtx() == ctx || ctx == context;
250 channelInputClosed(context, true);
251 }
252
253 @Override
254 public void channelShutdown(ChannelHandlerContext ctx, ChannelShutdownDirection direction) throws Exception {
255 ctx.fireChannelShutdown(direction);
256 if (direction == ChannelShutdownDirection.Inbound) {
257 // The decodeLast method is invoked when a channelInactive event is encountered.
258 // This method is responsible for ending requests in some situations and must be called
259 // when the input has been shutdown.
260 assert context.delegatingCtx() == ctx || ctx == context;
261 channelInputClosed(context, false);
262 }
263 }
264
265 private void channelInputClosed(ByteToMessageDecoderContext ctx, boolean callChannelInactive) {
266 try {
267 channelInputClosed(ctx);
268 } catch (DecoderException e) {
269 throw e;
270 } catch (Exception e) {
271 throw new DecoderException(e);
272 } finally {
273 if (cumulation != null) {
274 cumulation.close();
275 cumulation = null;
276 }
277 if (ctx.fireChannelReadCallCount() > 0) {
278 ctx.reset();
279 // Something was read, call fireChannelReadComplete()
280 ctx.fireChannelReadComplete();
281 }
282 if (callChannelInactive) {
283 ctx.fireChannelInactive();
284 }
285 }
286 }
287
288 /**
289 * Called when the input of the channel was closed which may be because it changed to inactive or because of
290 * shutdown.
291 */
292 void channelInputClosed(ByteToMessageDecoderContext ctx) throws Exception {
293 if (cumulation != null) {
294 callDecode(ctx, cumulation);
295 // If callDecode(...) removed the handle from the pipeline we should not call decodeLast(...) as this would
296 // be unexpected.
297 if (!ctx.isRemoved()) {
298 // Use Unpooled.EMPTY_BUFFER if cumulation become null after calling callDecode(...).
299 // See https://github.com/netty/netty/issues/10802.
300 Buffer buffer = cumulation == null ? ctx.bufferAllocator().allocate(0) : cumulation;
301 decodeLast(ctx, buffer);
302 }
303 } else {
304 decodeLast(ctx, ctx.bufferAllocator().allocate(0));
305 }
306 }
307
308 /**
309 * Called once data should be decoded from the given {@link Buffer}. This method will call
310 * {@link #decode(ChannelHandlerContext, Buffer)} as long as decoding should take place.
311 *
312 * @param ctx the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
313 * @param in the {@link Buffer} from which to read data
314 */
315 void callDecode(ByteToMessageDecoderContext ctx, Buffer in) {
316 try {
317 while (in.readableBytes() > 0 && !ctx.isRemoved()) {
318
319 int oldInputLength = in.readableBytes();
320 int numReadCalled = ctx.fireChannelReadCallCount();
321 decodeRemovalReentryProtection(ctx, in);
322
323 // Check if this handler was removed before continuing the loop.
324 // If it was removed, it is not safe to continue to operate on the buffer.
325 //
326 // See https://github.com/netty/netty/issues/1664
327 if (ctx.isRemoved()) {
328 break;
329 }
330
331 if (numReadCalled == ctx.fireChannelReadCallCount()) {
332 if (oldInputLength == in.readableBytes()) {
333 break;
334 } else {
335 continue;
336 }
337 }
338
339 if (oldInputLength == in.readableBytes()) {
340 throw new DecoderException(
341 StringUtil.simpleClassName(getClass()) +
342 ".decode() did not read anything but decoded a message.");
343 }
344
345 if (isSingleDecode()) {
346 break;
347 }
348 }
349 } catch (DecoderException e) {
350 throw e;
351 } catch (Exception cause) {
352 throw new DecoderException(cause);
353 }
354 }
355
356 /**
357 * Decode the from one {@link Buffer} to another. This method will be called till either the input
358 * {@link Buffer} has nothing to read when return from this method or till nothing was read from the input
359 * {@link Buffer}.
360 *
361 * @param ctx the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
362 * @param in the {@link Buffer} from which to read data
363 * @throws Exception is thrown if an error occurs
364 */
365 protected abstract void decode(ChannelHandlerContext ctx, Buffer in) throws Exception;
366
367 /**
368 * Decode the from one {@link Buffer} to an other. This method will be called till either the input
369 * {@link Buffer} has nothing to read when return from this method or till nothing was read from the input
370 * {@link Buffer}.
371 *
372 * @param ctx the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
373 * @param in the {@link Buffer} from which to read data
374 * @throws Exception is thrown if an error occurs
375 */
376 final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, Buffer in)
377 throws Exception {
378 decode(ctx, in);
379 }
380
381 /**
382 * Is called one last time when the {@link ChannelHandlerContext} goes in-active. Which means the
383 * {@link #channelInactive(ChannelHandlerContext)} was triggered.
384 *
385 * By default this will just call {@link #decode(ChannelHandlerContext, Buffer)} but sub-classes may
386 * override this for some special cleanup operation.
387 */
388 protected void decodeLast(ChannelHandlerContext ctx, Buffer in) throws Exception {
389 if (in.readableBytes() > 0) {
390 // Only call decode() if there is something left in the buffer to decode.
391 // See https://github.com/netty/netty/issues/4386
392 decodeRemovalReentryProtection(ctx, in);
393 }
394 }
395
396 /**
397 * Cumulate {@link Buffer}s.
398 */
399 public interface Cumulator {
400 /**
401 * Cumulate the given {@link Buffer}s and return the {@link Buffer} that holds the cumulated bytes.
402 * The implementation is responsible to correctly handle the life-cycle of the given {@link Buffer}s and so
403 * call {@link Buffer#close()} if a {@link Buffer} is fully consumed.
404 */
405 Buffer cumulate(BufferAllocator alloc, Buffer cumulation, Buffer in);
406
407 /**
408 * Consume the given buffer and return a new buffer with the same readable data, but where any data before the
409 * read offset may have been removed.
410 * The returned buffer may be the same buffer instance as the buffer passed in.
411 *
412 * @param cumulation The buffer we wish to trim already processed bytes from.
413 * @return A buffer where the bytes before the reader-offset have been removed.
414 */
415 Buffer discardSomeReadBytes(Buffer cumulation);
416 }
417
418 // Package private so we can also make use of it in ReplayingDecoder.
419 static final class ByteToMessageDecoderContext extends DelegatingChannelHandlerContext {
420 private int fireChannelReadCalled;
421
422 private ByteToMessageDecoderContext(ChannelHandlerContext ctx) {
423 super(ctx);
424 }
425
426 void reset() {
427 fireChannelReadCalled = 0;
428 }
429
430 int fireChannelReadCallCount() {
431 return fireChannelReadCalled;
432 }
433
434 @Override
435 public ChannelHandlerContext fireChannelRead(Object msg) {
436 fireChannelReadCalled ++;
437 super.fireChannelRead(msg);
438 return this;
439 }
440 }
441
442 private static final class CompositeBufferCumulator implements Cumulator {
443 @Override
444 public Buffer cumulate(BufferAllocator alloc, Buffer cumulation, Buffer in) {
445 if (cumulation.readableBytes() == 0) {
446 cumulation.close();
447 return in;
448 }
449 try (in) {
450 if (in.readableBytes() == 0) {
451 return cumulation;
452 }
453 if (cumulation.readOnly()) {
454 Buffer tmp = cumulation.copy();
455 cumulation.close();
456 cumulation = tmp;
457 }
458 if (CompositeBuffer.isComposite(cumulation)) {
459 CompositeBuffer composite = (CompositeBuffer) cumulation;
460 composite.extendWith(prepareInForCompose(in));
461 return composite;
462 }
463 return alloc.compose(Arrays.asList(cumulation.send(), prepareInForCompose(in)));
464 }
465 }
466
467 private static Send<Buffer> prepareInForCompose(Buffer in) {
468 return in.readOnly() ? in.copy().send() : in.send();
469 }
470
471 @Override
472 public Buffer discardSomeReadBytes(Buffer cumulation) {
473 // Compact is slow on composite buffers, and we also need to avoid leaving any writable space at the end.
474 // Using readSplit(0), we grab zero readable bytes in the split-off buffer, but all the already-read
475 // bytes get cut off from the cumulation buffer.
476 cumulation.readSplit(0).close();
477 return cumulation;
478 }
479
480 @Override
481 public String toString() {
482 return "CompositeBufferCumulator";
483 }
484 }
485
486 private static final class MergeCumulator implements Cumulator {
487 @Override
488 public Buffer cumulate(BufferAllocator alloc, Buffer cumulation, Buffer in) {
489 if (cumulation.readableBytes() == 0) {
490 // If cumulation is empty and input buffer is contiguous, use it directly
491 cumulation.close();
492 return in;
493 }
494 // We must close input Buffer in all cases as otherwise it may produce a leak if writeBytes(...) throw
495 // for whatever close (for example because of OutOfMemoryError)
496 try (in) {
497 final int required = in.readableBytes();
498 if (required > cumulation.writableBytes() || cumulation.readOnly()) {
499 return expandCumulationAndWrite(alloc, cumulation, in);
500 }
501 cumulation.writeBytes(in);
502 return cumulation;
503 }
504 }
505
506 @Override
507 public Buffer discardSomeReadBytes(Buffer cumulation) {
508 if (cumulation.readerOffset() > cumulation.writableBytes()) {
509 cumulation.compact();
510 }
511 return cumulation;
512 }
513
514 private static Buffer expandCumulationAndWrite(BufferAllocator alloc, Buffer oldCumulation, Buffer in) {
515 final int newSize = safeFindNextPositivePowerOfTwo(oldCumulation.readableBytes() + in.readableBytes());
516 Buffer newCumulation = oldCumulation.readOnly() ? alloc.allocate(newSize) :
517 oldCumulation.ensureWritable(newSize);
518 try {
519 if (newCumulation != oldCumulation) {
520 newCumulation.writeBytes(oldCumulation);
521 }
522 newCumulation.writeBytes(in);
523 return newCumulation;
524 } finally {
525 if (newCumulation != oldCumulation) {
526 oldCumulation.close();
527 }
528 }
529 }
530
531 @Override
532 public String toString() {
533 return "MergeCumulator";
534 }
535 }
536 }