1 /*
2 * Copyright 2012 The Netty Project
3 *
4 * The Netty Project licenses this file to you under the Apache License,
5 * version 2.0 (the "License"); you may not use this file except in compliance
6 * with the License. You may obtain a copy of the License at:
7 *
8 * https://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations
14 * under the License.
15 */
16 package io.netty.handler.codec;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufAllocator;
20 import io.netty.buffer.CompositeByteBuf;
21 import io.netty.buffer.Unpooled;
22 import io.netty.channel.ChannelConfig;
23 import io.netty.channel.ChannelHandlerContext;
24 import io.netty.channel.ChannelInboundHandlerAdapter;
25 import io.netty.channel.socket.ChannelInputShutdownEvent;
26 import io.netty.util.IllegalReferenceCountException;
27 import io.netty.util.internal.ObjectUtil;
28 import io.netty.util.internal.StringUtil;
29
30 import java.util.ArrayDeque;
31 import java.util.List;
32 import java.util.Queue;
33
34 import static io.netty.buffer.Unpooled.EMPTY_BUFFER;
35 import static io.netty.util.internal.ObjectUtil.checkPositive;
36
37 /**
38 * {@link ChannelInboundHandlerAdapter} which decodes bytes in a stream-like fashion from one {@link ByteBuf} to
39 * another Message type.
40 * <p>
41 * For example here is an implementation which reads all readable bytes from
42 * the input {@link ByteBuf} and create a new {@link ByteBuf}.
43 *
44 * <pre>
45 * public class SquareDecoder extends {@link ByteToMessageDecoder} {
46 * {@code @Override}
47 * public void decode({@link ChannelHandlerContext} ctx, {@link ByteBuf} in, List<Object> out)
48 * throws {@link Exception} {
49 * out.add(in.readBytes(in.readableBytes()));
50 * }
51 * }
52 * </pre>
53 *
54 * <h3>Frame detection</h3>
55 * <p>
56 * Generally frame detection should be handled earlier in the pipeline by adding a
57 * {@link DelimiterBasedFrameDecoder}, {@link FixedLengthFrameDecoder}, {@link LengthFieldBasedFrameDecoder},
58 * or {@link LineBasedFrameDecoder}.
59 * <p>
60 * If a custom frame decoder is required, then one needs to be careful when implementing
61 * one with {@link ByteToMessageDecoder}. Ensure there are enough bytes in the buffer for a
62 * complete frame by checking {@link ByteBuf#readableBytes()}. If there are not enough bytes
63 * for a complete frame, return without modifying the reader index to allow more bytes to arrive.
64 * <p>
65 * To check for complete frames without modifying the reader index, use methods like {@link ByteBuf#getInt(int)}.
66 * One <strong>MUST</strong> use the reader index when using methods like {@link ByteBuf#getInt(int)}.
67 * For example calling <tt>in.getInt(0)</tt> is assuming the frame starts at the beginning of the buffer, which
68 * is not always the case. Use <tt>in.getInt(in.readerIndex())</tt> instead.
69 * <h3>Pitfalls</h3>
70 * <p>
71 * Be aware that subclasses of {@link ByteToMessageDecoder} <strong>MUST NOT</strong>
72 * annotated with {@link @Sharable}.
73 * <p>
74 * Some methods such as {@link ByteBuf#readBytes(int)} will cause a memory leak if the returned buffer
75 * is not released or added to the <tt>out</tt> {@link List}. Use derived buffers like {@link ByteBuf#readSlice(int)}
76 * to avoid leaking memory.
77 */
78 public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter {
79
80 /**
81 * Cumulate {@link ByteBuf}s by merge them into one {@link ByteBuf}'s, using memory copies.
82 */
83 public static final Cumulator MERGE_CUMULATOR = new Cumulator() {
84 @Override
85 public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
86 if (cumulation == in) {
87 // when the in buffer is the same as the cumulation it is doubly retained, release it once
88 in.release();
89 return cumulation;
90 }
91 if (!cumulation.isReadable() && in.isContiguous()) {
92 // If cumulation is empty and input buffer is contiguous, use it directly
93 cumulation.release();
94 return in;
95 }
96 try {
97 final int required = in.readableBytes();
98 if (required > cumulation.maxWritableBytes() ||
99 required > cumulation.maxFastWritableBytes() && cumulation.refCnt() > 1 ||
100 cumulation.isReadOnly()) {
101 // Expand cumulation (by replacing it) under the following conditions:
102 // - cumulation cannot be resized to accommodate the additional data
103 // - cumulation can be expanded with a reallocation operation to accommodate but the buffer is
104 // assumed to be shared (e.g. refCnt() > 1) and the reallocation may not be safe.
105 return expandCumulation(alloc, cumulation, in);
106 }
107 cumulation.writeBytes(in, in.readerIndex(), required);
108 in.readerIndex(in.writerIndex());
109 return cumulation;
110 } finally {
111 // We must release in all cases as otherwise it may produce a leak if writeBytes(...) throw
112 // for whatever release (for example because of OutOfMemoryError)
113 in.release();
114 }
115 }
116 };
117
118 /**
119 * Cumulate {@link ByteBuf}s by add them to a {@link CompositeByteBuf} and so do no memory copy whenever possible.
120 * Be aware that {@link CompositeByteBuf} use a more complex indexing implementation so depending on your use-case
121 * and the decoder implementation this may be slower than just use the {@link #MERGE_CUMULATOR}.
122 */
123 public static final Cumulator COMPOSITE_CUMULATOR = new Cumulator() {
124 @Override
125 public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
126 if (cumulation == in) {
127 // when the in buffer is the same as the cumulation it is doubly retained, release it once
128 in.release();
129 return cumulation;
130 }
131 if (!cumulation.isReadable()) {
132 cumulation.release();
133 return in;
134 }
135 CompositeByteBuf composite = null;
136 try {
137 if (cumulation instanceof CompositeByteBuf && cumulation.refCnt() == 1) {
138 composite = (CompositeByteBuf) cumulation;
139 // Writer index must equal capacity if we are going to "write"
140 // new components to the end
141 if (composite.writerIndex() != composite.capacity()) {
142 composite.capacity(composite.writerIndex());
143 }
144 } else {
145 composite = alloc.compositeBuffer(Integer.MAX_VALUE).addFlattenedComponents(true, cumulation);
146 }
147 composite.addFlattenedComponents(true, in);
148 in = null;
149 return composite;
150 } finally {
151 if (in != null) {
152 // We must release if the ownership was not transferred as otherwise it may produce a leak
153 in.release();
154 // Also release any new buffer allocated if we're not returning it
155 if (composite != null && composite != cumulation) {
156 composite.release();
157 }
158 }
159 }
160 }
161 };
162
163 private static final byte STATE_INIT = 0;
164 private static final byte STATE_CALLING_CHILD_DECODE = 1;
165 private static final byte STATE_HANDLER_REMOVED_PENDING = 2;
166
167 // Used to guard the inputs for reentrant channelRead calls
168 private Queue<Object> inputMessages;
169 ByteBuf cumulation;
170 private Cumulator cumulator = MERGE_CUMULATOR;
171 private boolean singleDecode;
172 private boolean first;
173
174 /**
175 * This flag is used to determine if we need to call {@link ChannelHandlerContext#read()} to consume more data
176 * when {@link ChannelConfig#isAutoRead()} is {@code false}.
177 */
178 private boolean firedChannelRead;
179
180 private boolean selfFiredChannelRead;
181
182 /**
183 * A bitmask where the bits are defined as
184 * <ul>
185 * <li>{@link #STATE_INIT}</li>
186 * <li>{@link #STATE_CALLING_CHILD_DECODE}</li>
187 * <li>{@link #STATE_HANDLER_REMOVED_PENDING}</li>
188 * </ul>
189 */
190 private byte decodeState = STATE_INIT;
191 private int discardAfterReads = 16;
192 private int numReads;
193
194 protected ByteToMessageDecoder() {
195 ensureNotSharable();
196 }
197
198 /**
199 * If set then only one message is decoded on each {@link #channelRead(ChannelHandlerContext, Object)}
200 * call. This may be useful if you need to do some protocol upgrade and want to make sure nothing is mixed up.
201 *
202 * Default is {@code false} as this has performance impacts.
203 */
204 public void setSingleDecode(boolean singleDecode) {
205 this.singleDecode = singleDecode;
206 }
207
208 /**
209 * If {@code true} then only one message is decoded on each
210 * {@link #channelRead(ChannelHandlerContext, Object)} call.
211 *
212 * Default is {@code false} as this has performance impacts.
213 */
214 public boolean isSingleDecode() {
215 return singleDecode;
216 }
217
218 /**
219 * Set the {@link Cumulator} to use for cumulate the received {@link ByteBuf}s.
220 */
221 public void setCumulator(Cumulator cumulator) {
222 this.cumulator = ObjectUtil.checkNotNull(cumulator, "cumulator");
223 }
224
225 /**
226 * Set the number of reads after which {@link ByteBuf#discardSomeReadBytes()} are called and so free up memory.
227 * The default is {@code 16}.
228 */
229 public void setDiscardAfterReads(int discardAfterReads) {
230 checkPositive(discardAfterReads, "discardAfterReads");
231 this.discardAfterReads = discardAfterReads;
232 }
233
234 /**
235 * Returns the actual number of readable bytes in the internal cumulative
236 * buffer of this decoder. You usually do not need to rely on this value
237 * to write a decoder. Use it only when you must use it at your own risk.
238 * This method is a shortcut to {@link #internalBuffer() internalBuffer().readableBytes()}.
239 */
240 protected int actualReadableBytes() {
241 return internalBuffer().readableBytes();
242 }
243
244 /**
245 * Returns the internal cumulative buffer of this decoder. You usually
246 * do not need to access the internal buffer directly to write a decoder.
247 * Use it only when you must use it at your own risk.
248 */
249 protected ByteBuf internalBuffer() {
250 if (cumulation != null) {
251 return cumulation;
252 } else {
253 return Unpooled.EMPTY_BUFFER;
254 }
255 }
256
257 @Override
258 public final void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
259 if (decodeState == STATE_CALLING_CHILD_DECODE) {
260 decodeState = STATE_HANDLER_REMOVED_PENDING;
261 return;
262 }
263 ByteBuf buf = cumulation;
264 if (buf != null) {
265 // Directly set this to null, so we are sure we not access it in any other method here anymore.
266 cumulation = null;
267 numReads = 0;
268 int readable = buf.readableBytes();
269 if (readable > 0) {
270 ctx.fireChannelRead(buf);
271 ctx.fireChannelReadComplete();
272 } else {
273 buf.release();
274 }
275 }
276 handlerRemoved0(ctx);
277 }
278
279 /**
280 * Gets called after the {@link ByteToMessageDecoder} was removed from the actual context and it doesn't handle
281 * events anymore.
282 */
283 protected void handlerRemoved0(ChannelHandlerContext ctx) throws Exception { }
284
285 @Override
286 public void channelRead(ChannelHandlerContext ctx, Object input) throws Exception {
287 if (decodeState == STATE_INIT) {
288 do {
289 if (input instanceof ByteBuf) {
290 selfFiredChannelRead = true;
291 CodecOutputList out = CodecOutputList.newInstance();
292 try {
293 first = cumulation == null;
294 cumulation = cumulator.cumulate(ctx.alloc(),
295 first ? EMPTY_BUFFER : cumulation, (ByteBuf) input);
296 callDecode(ctx, cumulation, out);
297 } catch (DecoderException e) {
298 throw e;
299 } catch (Exception e) {
300 throw new DecoderException(e);
301 } finally {
302 try {
303 if (cumulation != null && !cumulation.isReadable()) {
304 numReads = 0;
305 try {
306 cumulation.release();
307 } catch (IllegalReferenceCountException e) {
308 //noinspection ThrowFromFinallyBlock
309 throw new IllegalReferenceCountException(
310 getClass().getSimpleName() +
311 "#decode() might have released its input buffer, " +
312 "or passed it down the pipeline without a retain() call, " +
313 "which is not allowed.", e);
314 }
315 cumulation = null;
316 } else if (++numReads >= discardAfterReads) {
317 // We did enough reads already try to discard some bytes, so we not risk to see a OOME.
318 // See https://github.com/netty/netty/issues/4275
319 numReads = 0;
320 discardSomeReadBytes();
321 }
322
323 int size = out.size();
324 firedChannelRead |= out.insertSinceRecycled();
325 fireChannelRead(ctx, out, size);
326 } finally {
327 out.recycle();
328 }
329 }
330 } else {
331 ctx.fireChannelRead(input);
332 }
333 } while (inputMessages != null && (input = inputMessages.poll()) != null);
334 } else {
335 // Reentrant call. Bail out here and let original call process our message.
336 if (inputMessages == null) {
337 inputMessages = new ArrayDeque<>(2);
338 }
339 inputMessages.offer(input);
340 }
341 }
342
343 /**
344 * Get {@code numElements} out of the {@link List} and forward these through the pipeline.
345 */
346 static void fireChannelRead(ChannelHandlerContext ctx, List<Object> msgs, int numElements) {
347 if (msgs instanceof CodecOutputList) {
348 fireChannelRead(ctx, (CodecOutputList) msgs, numElements);
349 } else {
350 for (int i = 0; i < numElements; i++) {
351 ctx.fireChannelRead(msgs.get(i));
352 }
353 }
354 }
355
356 /**
357 * Get {@code numElements} out of the {@link CodecOutputList} and forward these through the pipeline.
358 */
359 static void fireChannelRead(ChannelHandlerContext ctx, CodecOutputList msgs, int numElements) {
360 for (int i = 0; i < numElements; i ++) {
361 ctx.fireChannelRead(msgs.getUnsafe(i));
362 }
363 }
364
365 @Override
366 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
367 numReads = 0;
368 discardSomeReadBytes();
369 if (selfFiredChannelRead && !firedChannelRead && !ctx.channel().config().isAutoRead()) {
370 ctx.read();
371 }
372 firedChannelRead = false;
373 selfFiredChannelRead = false;
374 ctx.fireChannelReadComplete();
375 }
376
377 protected final void discardSomeReadBytes() {
378 if (cumulation != null && !first && cumulation.refCnt() == 1) {
379 // discard some bytes if possible to make more room in the
380 // buffer but only if the refCnt == 1 as otherwise the user may have
381 // used slice().retain() or duplicate().retain().
382 //
383 // See:
384 // - https://github.com/netty/netty/issues/2327
385 // - https://github.com/netty/netty/issues/1764
386 cumulation.discardSomeReadBytes();
387 }
388 }
389
390 @Override
391 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
392 channelInputClosed(ctx, true);
393 }
394
395 @Override
396 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
397 if (evt instanceof ChannelInputShutdownEvent) {
398 // The decodeLast method is invoked when a channelInactive event is encountered.
399 // This method is responsible for ending requests in some situations and must be called
400 // when the input has been shutdown.
401 channelInputClosed(ctx, false);
402 }
403 super.userEventTriggered(ctx, evt);
404 }
405
406 private void channelInputClosed(ChannelHandlerContext ctx, boolean callChannelInactive) {
407 CodecOutputList out = CodecOutputList.newInstance();
408 try {
409 channelInputClosed(ctx, out);
410 } catch (DecoderException e) {
411 throw e;
412 } catch (Exception e) {
413 throw new DecoderException(e);
414 } finally {
415 try {
416 if (cumulation != null) {
417 cumulation.release();
418 cumulation = null;
419 }
420 int size = out.size();
421 fireChannelRead(ctx, out, size);
422 if (size > 0) {
423 // Something was read, call fireChannelReadComplete()
424 ctx.fireChannelReadComplete();
425 }
426 if (callChannelInactive) {
427 ctx.fireChannelInactive();
428 }
429 } finally {
430 // Recycle in all cases
431 out.recycle();
432 }
433 }
434 }
435
436 /**
437 * Called when the input of the channel was closed which may be because it changed to inactive or because of
438 * {@link ChannelInputShutdownEvent}.
439 */
440 void channelInputClosed(ChannelHandlerContext ctx, List<Object> out) throws Exception {
441 if (cumulation != null) {
442 callDecode(ctx, cumulation, out);
443 // If callDecode(...) removed the handle from the pipeline we should not call decodeLast(...) as this would
444 // be unexpected.
445 if (!ctx.isRemoved()) {
446 // Use Unpooled.EMPTY_BUFFER if cumulation become null after calling callDecode(...).
447 // See https://github.com/netty/netty/issues/10802.
448 ByteBuf buffer = cumulation == null ? Unpooled.EMPTY_BUFFER : cumulation;
449 decodeLast(ctx, buffer, out);
450 }
451 } else {
452 decodeLast(ctx, Unpooled.EMPTY_BUFFER, out);
453 }
454 }
455
456 /**
457 * Called once data should be decoded from the given {@link ByteBuf}. This method will call
458 * {@link #decode(ChannelHandlerContext, ByteBuf, List)} as long as decoding should take place.
459 *
460 * @param ctx the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
461 * @param in the {@link ByteBuf} from which to read data
462 * @param out the {@link List} to which decoded messages should be added
463 */
464 protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
465 try {
466 while (in.isReadable()) {
467 final int outSize = out.size();
468
469 if (outSize > 0) {
470 fireChannelRead(ctx, out, outSize);
471 out.clear();
472
473 // Check if this handler was removed before continuing with decoding.
474 // If it was removed, it is not safe to continue to operate on the buffer.
475 //
476 // See:
477 // - https://github.com/netty/netty/issues/4635
478 if (ctx.isRemoved()) {
479 break;
480 }
481 }
482
483 int oldInputLength = in.readableBytes();
484 decodeRemovalReentryProtection(ctx, in, out);
485
486 // Check if this handler was removed before continuing the loop.
487 // If it was removed, it is not safe to continue to operate on the buffer.
488 //
489 // See https://github.com/netty/netty/issues/1664
490 if (ctx.isRemoved()) {
491 break;
492 }
493
494 if (out.isEmpty()) {
495 if (oldInputLength == in.readableBytes()) {
496 break;
497 } else {
498 continue;
499 }
500 }
501
502 if (oldInputLength == in.readableBytes()) {
503 throw new DecoderException(
504 StringUtil.simpleClassName(getClass()) +
505 ".decode() did not read anything but decoded a message.");
506 }
507
508 if (isSingleDecode()) {
509 break;
510 }
511 }
512 } catch (DecoderException e) {
513 throw e;
514 } catch (Exception cause) {
515 throw new DecoderException(cause);
516 }
517 }
518
519 /**
520 * Decode the from one {@link ByteBuf} to an other. This method will be called till either the input
521 * {@link ByteBuf} has nothing to read when return from this method or till nothing was read from the input
522 * {@link ByteBuf}.
523 *
524 * @param ctx the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
525 * @param in the {@link ByteBuf} from which to read data
526 * @param out the {@link List} to which decoded messages should be added
527 * @throws Exception is thrown if an error occurs
528 */
529 protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;
530
531 /**
532 * Decode the from one {@link ByteBuf} to an other. This method will be called till either the input
533 * {@link ByteBuf} has nothing to read when return from this method or till nothing was read from the input
534 * {@link ByteBuf}.
535 *
536 * @param ctx the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
537 * @param in the {@link ByteBuf} from which to read data
538 * @param out the {@link List} to which decoded messages should be added
539 * @throws Exception is thrown if an error occurs
540 */
541 final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
542 throws Exception {
543 decodeState = STATE_CALLING_CHILD_DECODE;
544 try {
545 decode(ctx, in, out);
546 } finally {
547 if (inputMessages == null || inputMessages.isEmpty()) {
548 boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING;
549 decodeState = STATE_INIT;
550 if (removePending) {
551 fireChannelRead(ctx, out, out.size());
552 out.clear();
553 handlerRemoved(ctx);
554 }
555 }
556 }
557 }
558
559 /**
560 * Is called one last time when the {@link ChannelHandlerContext} goes in-active. Which means the
561 * {@link #channelInactive(ChannelHandlerContext)} was triggered.
562 *
563 * By default, this will just call {@link #decode(ChannelHandlerContext, ByteBuf, List)} but sub-classes may
564 * override this for some special cleanup operation.
565 */
566 protected void decodeLast(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
567 if (in.isReadable()) {
568 // Only call decode() if there is something left in the buffer to decode.
569 // See https://github.com/netty/netty/issues/4386
570 decodeRemovalReentryProtection(ctx, in, out);
571 }
572 }
573
574 static ByteBuf expandCumulation(ByteBufAllocator alloc, ByteBuf oldCumulation, ByteBuf in) {
575 int oldBytes = oldCumulation.readableBytes();
576 int newBytes = in.readableBytes();
577 int totalBytes = oldBytes + newBytes;
578 ByteBuf newCumulation = alloc.buffer(alloc.calculateNewCapacity(totalBytes, Integer.MAX_VALUE));
579 ByteBuf toRelease = newCumulation;
580 try {
581 // This avoids redundant checks and stack depth compared to calling writeBytes(...)
582 newCumulation.setBytes(0, oldCumulation, oldCumulation.readerIndex(), oldBytes)
583 .setBytes(oldBytes, in, in.readerIndex(), newBytes)
584 .writerIndex(totalBytes);
585 in.readerIndex(in.writerIndex());
586 toRelease = oldCumulation;
587 return newCumulation;
588 } finally {
589 toRelease.release();
590 }
591 }
592
593 /**
594 * Cumulate {@link ByteBuf}s.
595 */
596 public interface Cumulator {
597 /**
598 * Cumulate the given {@link ByteBuf}s and return the {@link ByteBuf} that holds the cumulated bytes.
599 * The implementation is responsible to correctly handle the life-cycle of the given {@link ByteBuf}s and so
600 * call {@link ByteBuf#release()} if a {@link ByteBuf} is fully consumed.
601 */
602 ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in);
603 }
604 }