1 /*
2 * Copyright 2012 The Netty Project
3 *
4 * The Netty Project licenses this file to you under the Apache License,
5 * version 2.0 (the "License"); you may not use this file except in compliance
6 * with the License. You may obtain a copy of the License at:
7 *
8 * https://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations
14 * under the License.
15 */
16 package io.netty.handler.codec;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufAllocator;
20 import io.netty.buffer.CompositeByteBuf;
21 import io.netty.buffer.Unpooled;
22 import io.netty.channel.ChannelConfig;
23 import io.netty.channel.ChannelHandlerContext;
24 import io.netty.channel.ChannelInboundHandlerAdapter;
25 import io.netty.channel.socket.ChannelInputShutdownEvent;
26 import io.netty.util.IllegalReferenceCountException;
27 import io.netty.util.internal.ObjectUtil;
28 import io.netty.util.internal.StringUtil;
29
30 import java.util.List;
31
32 import static io.netty.util.internal.ObjectUtil.checkPositive;
33 import static java.lang.Integer.MAX_VALUE;
34
35 /**
36 * {@link ChannelInboundHandlerAdapter} which decodes bytes in a stream-like fashion from one {@link ByteBuf} to an
37 * other Message type.
38 *
39 * For example here is an implementation which reads all readable bytes from
40 * the input {@link ByteBuf} and create a new {@link ByteBuf}.
41 *
42 * <pre>
43 * public class SquareDecoder extends {@link ByteToMessageDecoder} {
44 * {@code @Override}
45 * public void decode({@link ChannelHandlerContext} ctx, {@link ByteBuf} in, List<Object> out)
46 * throws {@link Exception} {
47 * out.add(in.readBytes(in.readableBytes()));
48 * }
49 * }
50 * </pre>
51 *
52 * <h3>Frame detection</h3>
53 * <p>
54 * Generally frame detection should be handled earlier in the pipeline by adding a
55 * {@link DelimiterBasedFrameDecoder}, {@link FixedLengthFrameDecoder}, {@link LengthFieldBasedFrameDecoder},
56 * or {@link LineBasedFrameDecoder}.
57 * <p>
58 * If a custom frame decoder is required, then one needs to be careful when implementing
59 * one with {@link ByteToMessageDecoder}. Ensure there are enough bytes in the buffer for a
60 * complete frame by checking {@link ByteBuf#readableBytes()}. If there are not enough bytes
61 * for a complete frame, return without modifying the reader index to allow more bytes to arrive.
62 * <p>
63 * To check for complete frames without modifying the reader index, use methods like {@link ByteBuf#getInt(int)}.
64 * One <strong>MUST</strong> use the reader index when using methods like {@link ByteBuf#getInt(int)}.
65 * For example calling <tt>in.getInt(0)</tt> is assuming the frame starts at the beginning of the buffer, which
66 * is not always the case. Use <tt>in.getInt(in.readerIndex())</tt> instead.
67 * <h3>Pitfalls</h3>
68 * <p>
69 * Be aware that sub-classes of {@link ByteToMessageDecoder} <strong>MUST NOT</strong>
70 * annotated with {@link @Sharable}.
71 * <p>
72 * Some methods such as {@link ByteBuf#readBytes(int)} will cause a memory leak if the returned buffer
73 * is not released or added to the <tt>out</tt> {@link List}. Use derived buffers like {@link ByteBuf#readSlice(int)}
74 * to avoid leaking memory.
75 */
76 public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter {
77
78 /**
79 * Cumulate {@link ByteBuf}s by merge them into one {@link ByteBuf}'s, using memory copies.
80 */
81 public static final Cumulator MERGE_CUMULATOR = new Cumulator() {
82 @Override
83 public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
84 if (cumulation == in) {
85 // when the in buffer is the same as the cumulation it is doubly retained, release it once
86 in.release();
87 return cumulation;
88 }
89 if (!cumulation.isReadable() && in.isContiguous()) {
90 // If cumulation is empty and input buffer is contiguous, use it directly
91 cumulation.release();
92 return in;
93 }
94 try {
95 final int required = in.readableBytes();
96 if (required > cumulation.maxWritableBytes() ||
97 required > cumulation.maxFastWritableBytes() && cumulation.refCnt() > 1 ||
98 cumulation.isReadOnly()) {
99 // Expand cumulation (by replacing it) under the following conditions:
100 // - cumulation cannot be resized to accommodate the additional data
101 // - cumulation can be expanded with a reallocation operation to accommodate but the buffer is
102 // assumed to be shared (e.g. refCnt() > 1) and the reallocation may not be safe.
103 return expandCumulation(alloc, cumulation, in);
104 }
105 cumulation.writeBytes(in, in.readerIndex(), required);
106 in.readerIndex(in.writerIndex());
107 return cumulation;
108 } finally {
109 // We must release in all cases as otherwise it may produce a leak if writeBytes(...) throw
110 // for whatever release (for example because of OutOfMemoryError)
111 in.release();
112 }
113 }
114 };
115
116 /**
117 * Cumulate {@link ByteBuf}s by add them to a {@link CompositeByteBuf} and so do no memory copy whenever possible.
118 * Be aware that {@link CompositeByteBuf} use a more complex indexing implementation so depending on your use-case
119 * and the decoder implementation this may be slower than just use the {@link #MERGE_CUMULATOR}.
120 */
121 public static final Cumulator COMPOSITE_CUMULATOR = new Cumulator() {
122 @Override
123 public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
124 if (cumulation == in) {
125 // when the in buffer is the same as the cumulation it is doubly retained, release it once
126 in.release();
127 return cumulation;
128 }
129 if (!cumulation.isReadable()) {
130 cumulation.release();
131 return in;
132 }
133 CompositeByteBuf composite = null;
134 try {
135 if (cumulation instanceof CompositeByteBuf && cumulation.refCnt() == 1) {
136 composite = (CompositeByteBuf) cumulation;
137 // Writer index must equal capacity if we are going to "write"
138 // new components to the end
139 if (composite.writerIndex() != composite.capacity()) {
140 composite.capacity(composite.writerIndex());
141 }
142 } else {
143 composite = alloc.compositeBuffer(Integer.MAX_VALUE).addFlattenedComponents(true, cumulation);
144 }
145 composite.addFlattenedComponents(true, in);
146 in = null;
147 return composite;
148 } finally {
149 if (in != null) {
150 // We must release if the ownership was not transferred as otherwise it may produce a leak
151 in.release();
152 // Also release any new buffer allocated if we're not returning it
153 if (composite != null && composite != cumulation) {
154 composite.release();
155 }
156 }
157 }
158 }
159 };
160
161 private static final byte STATE_INIT = 0;
162 private static final byte STATE_CALLING_CHILD_DECODE = 1;
163 private static final byte STATE_HANDLER_REMOVED_PENDING = 2;
164
165 ByteBuf cumulation;
166 private Cumulator cumulator = MERGE_CUMULATOR;
167 private boolean singleDecode;
168 private boolean first;
169
170 /**
171 * This flag is used to determine if we need to call {@link ChannelHandlerContext#read()} to consume more data
172 * when {@link ChannelConfig#isAutoRead()} is {@code false}.
173 */
174 private boolean firedChannelRead;
175
176 private boolean selfFiredChannelRead;
177
178 /**
179 * A bitmask where the bits are defined as
180 * <ul>
181 * <li>{@link #STATE_INIT}</li>
182 * <li>{@link #STATE_CALLING_CHILD_DECODE}</li>
183 * <li>{@link #STATE_HANDLER_REMOVED_PENDING}</li>
184 * </ul>
185 */
186 private byte decodeState = STATE_INIT;
187 private int discardAfterReads = 16;
188 private int numReads;
189
190 protected ByteToMessageDecoder() {
191 ensureNotSharable();
192 }
193
194 /**
195 * If set then only one message is decoded on each {@link #channelRead(ChannelHandlerContext, Object)}
196 * call. This may be useful if you need to do some protocol upgrade and want to make sure nothing is mixed up.
197 *
198 * Default is {@code false} as this has performance impacts.
199 */
200 public void setSingleDecode(boolean singleDecode) {
201 this.singleDecode = singleDecode;
202 }
203
204 /**
205 * If {@code true} then only one message is decoded on each
206 * {@link #channelRead(ChannelHandlerContext, Object)} call.
207 *
208 * Default is {@code false} as this has performance impacts.
209 */
210 public boolean isSingleDecode() {
211 return singleDecode;
212 }
213
214 /**
215 * Set the {@link Cumulator} to use for cumulate the received {@link ByteBuf}s.
216 */
217 public void setCumulator(Cumulator cumulator) {
218 this.cumulator = ObjectUtil.checkNotNull(cumulator, "cumulator");
219 }
220
221 /**
222 * Set the number of reads after which {@link ByteBuf#discardSomeReadBytes()} are called and so free up memory.
223 * The default is {@code 16}.
224 */
225 public void setDiscardAfterReads(int discardAfterReads) {
226 checkPositive(discardAfterReads, "discardAfterReads");
227 this.discardAfterReads = discardAfterReads;
228 }
229
230 /**
231 * Returns the actual number of readable bytes in the internal cumulative
232 * buffer of this decoder. You usually do not need to rely on this value
233 * to write a decoder. Use it only when you must use it at your own risk.
234 * This method is a shortcut to {@link #internalBuffer() internalBuffer().readableBytes()}.
235 */
236 protected int actualReadableBytes() {
237 return internalBuffer().readableBytes();
238 }
239
240 /**
241 * Returns the internal cumulative buffer of this decoder. You usually
242 * do not need to access the internal buffer directly to write a decoder.
243 * Use it only when you must use it at your own risk.
244 */
245 protected ByteBuf internalBuffer() {
246 if (cumulation != null) {
247 return cumulation;
248 } else {
249 return Unpooled.EMPTY_BUFFER;
250 }
251 }
252
253 @Override
254 public final void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
255 if (decodeState == STATE_CALLING_CHILD_DECODE) {
256 decodeState = STATE_HANDLER_REMOVED_PENDING;
257 return;
258 }
259 ByteBuf buf = cumulation;
260 if (buf != null) {
261 // Directly set this to null, so we are sure we not access it in any other method here anymore.
262 cumulation = null;
263 numReads = 0;
264 int readable = buf.readableBytes();
265 if (readable > 0) {
266 ctx.fireChannelRead(buf);
267 ctx.fireChannelReadComplete();
268 } else {
269 buf.release();
270 }
271 }
272 handlerRemoved0(ctx);
273 }
274
275 /**
276 * Gets called after the {@link ByteToMessageDecoder} was removed from the actual context and it doesn't handle
277 * events anymore.
278 */
279 protected void handlerRemoved0(ChannelHandlerContext ctx) throws Exception { }
280
281 @Override
282 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
283 if (msg instanceof ByteBuf) {
284 selfFiredChannelRead = true;
285 CodecOutputList out = CodecOutputList.newInstance();
286 try {
287 first = cumulation == null;
288 cumulation = cumulator.cumulate(ctx.alloc(),
289 first ? Unpooled.EMPTY_BUFFER : cumulation, (ByteBuf) msg);
290 callDecode(ctx, cumulation, out);
291 } catch (DecoderException e) {
292 throw e;
293 } catch (Exception e) {
294 throw new DecoderException(e);
295 } finally {
296 try {
297 if (cumulation != null && !cumulation.isReadable()) {
298 numReads = 0;
299 try {
300 cumulation.release();
301 } catch (IllegalReferenceCountException e) {
302 //noinspection ThrowFromFinallyBlock
303 throw new IllegalReferenceCountException(
304 getClass().getSimpleName() + "#decode() might have released its input buffer, " +
305 "or passed it down the pipeline without a retain() call, " +
306 "which is not allowed.", e);
307 }
308 cumulation = null;
309 } else if (++numReads >= discardAfterReads) {
310 // We did enough reads already try to discard some bytes, so we not risk to see a OOME.
311 // See https://github.com/netty/netty/issues/4275
312 numReads = 0;
313 discardSomeReadBytes();
314 }
315
316 int size = out.size();
317 firedChannelRead |= out.insertSinceRecycled();
318 fireChannelRead(ctx, out, size);
319 } finally {
320 out.recycle();
321 }
322 }
323 } else {
324 ctx.fireChannelRead(msg);
325 }
326 }
327
328 /**
329 * Get {@code numElements} out of the {@link List} and forward these through the pipeline.
330 */
331 static void fireChannelRead(ChannelHandlerContext ctx, List<Object> msgs, int numElements) {
332 if (msgs instanceof CodecOutputList) {
333 fireChannelRead(ctx, (CodecOutputList) msgs, numElements);
334 } else {
335 for (int i = 0; i < numElements; i++) {
336 ctx.fireChannelRead(msgs.get(i));
337 }
338 }
339 }
340
341 /**
342 * Get {@code numElements} out of the {@link CodecOutputList} and forward these through the pipeline.
343 */
344 static void fireChannelRead(ChannelHandlerContext ctx, CodecOutputList msgs, int numElements) {
345 for (int i = 0; i < numElements; i ++) {
346 ctx.fireChannelRead(msgs.getUnsafe(i));
347 }
348 }
349
350 @Override
351 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
352 numReads = 0;
353 discardSomeReadBytes();
354 if (selfFiredChannelRead && !firedChannelRead && !ctx.channel().config().isAutoRead()) {
355 ctx.read();
356 }
357 firedChannelRead = false;
358 selfFiredChannelRead = false;
359 ctx.fireChannelReadComplete();
360 }
361
362 protected final void discardSomeReadBytes() {
363 if (cumulation != null && !first && cumulation.refCnt() == 1) {
364 // discard some bytes if possible to make more room in the
365 // buffer but only if the refCnt == 1 as otherwise the user may have
366 // used slice().retain() or duplicate().retain().
367 //
368 // See:
369 // - https://github.com/netty/netty/issues/2327
370 // - https://github.com/netty/netty/issues/1764
371 cumulation.discardSomeReadBytes();
372 }
373 }
374
375 @Override
376 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
377 channelInputClosed(ctx, true);
378 }
379
380 @Override
381 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
382 if (evt instanceof ChannelInputShutdownEvent) {
383 // The decodeLast method is invoked when a channelInactive event is encountered.
384 // This method is responsible for ending requests in some situations and must be called
385 // when the input has been shutdown.
386 channelInputClosed(ctx, false);
387 }
388 super.userEventTriggered(ctx, evt);
389 }
390
391 private void channelInputClosed(ChannelHandlerContext ctx, boolean callChannelInactive) {
392 CodecOutputList out = CodecOutputList.newInstance();
393 try {
394 channelInputClosed(ctx, out);
395 } catch (DecoderException e) {
396 throw e;
397 } catch (Exception e) {
398 throw new DecoderException(e);
399 } finally {
400 try {
401 if (cumulation != null) {
402 cumulation.release();
403 cumulation = null;
404 }
405 int size = out.size();
406 fireChannelRead(ctx, out, size);
407 if (size > 0) {
408 // Something was read, call fireChannelReadComplete()
409 ctx.fireChannelReadComplete();
410 }
411 if (callChannelInactive) {
412 ctx.fireChannelInactive();
413 }
414 } finally {
415 // Recycle in all cases
416 out.recycle();
417 }
418 }
419 }
420
421 /**
422 * Called when the input of the channel was closed which may be because it changed to inactive or because of
423 * {@link ChannelInputShutdownEvent}.
424 */
425 void channelInputClosed(ChannelHandlerContext ctx, List<Object> out) throws Exception {
426 if (cumulation != null) {
427 callDecode(ctx, cumulation, out);
428 // If callDecode(...) removed the handle from the pipeline we should not call decodeLast(...) as this would
429 // be unexpected.
430 if (!ctx.isRemoved()) {
431 // Use Unpooled.EMPTY_BUFFER if cumulation become null after calling callDecode(...).
432 // See https://github.com/netty/netty/issues/10802.
433 ByteBuf buffer = cumulation == null ? Unpooled.EMPTY_BUFFER : cumulation;
434 decodeLast(ctx, buffer, out);
435 }
436 } else {
437 decodeLast(ctx, Unpooled.EMPTY_BUFFER, out);
438 }
439 }
440
441 /**
442 * Called once data should be decoded from the given {@link ByteBuf}. This method will call
443 * {@link #decode(ChannelHandlerContext, ByteBuf, List)} as long as decoding should take place.
444 *
445 * @param ctx the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
446 * @param in the {@link ByteBuf} from which to read data
447 * @param out the {@link List} to which decoded messages should be added
448 */
449 protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
450 try {
451 while (in.isReadable()) {
452 final int outSize = out.size();
453
454 if (outSize > 0) {
455 fireChannelRead(ctx, out, outSize);
456 out.clear();
457
458 // Check if this handler was removed before continuing with decoding.
459 // If it was removed, it is not safe to continue to operate on the buffer.
460 //
461 // See:
462 // - https://github.com/netty/netty/issues/4635
463 if (ctx.isRemoved()) {
464 break;
465 }
466 }
467
468 int oldInputLength = in.readableBytes();
469 decodeRemovalReentryProtection(ctx, in, out);
470
471 // Check if this handler was removed before continuing the loop.
472 // If it was removed, it is not safe to continue to operate on the buffer.
473 //
474 // See https://github.com/netty/netty/issues/1664
475 if (ctx.isRemoved()) {
476 break;
477 }
478
479 if (out.isEmpty()) {
480 if (oldInputLength == in.readableBytes()) {
481 break;
482 } else {
483 continue;
484 }
485 }
486
487 if (oldInputLength == in.readableBytes()) {
488 throw new DecoderException(
489 StringUtil.simpleClassName(getClass()) +
490 ".decode() did not read anything but decoded a message.");
491 }
492
493 if (isSingleDecode()) {
494 break;
495 }
496 }
497 } catch (DecoderException e) {
498 throw e;
499 } catch (Exception cause) {
500 throw new DecoderException(cause);
501 }
502 }
503
504 /**
505 * Decode the from one {@link ByteBuf} to an other. This method will be called till either the input
506 * {@link ByteBuf} has nothing to read when return from this method or till nothing was read from the input
507 * {@link ByteBuf}.
508 *
509 * @param ctx the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
510 * @param in the {@link ByteBuf} from which to read data
511 * @param out the {@link List} to which decoded messages should be added
512 * @throws Exception is thrown if an error occurs
513 */
514 protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;
515
516 /**
517 * Decode the from one {@link ByteBuf} to an other. This method will be called till either the input
518 * {@link ByteBuf} has nothing to read when return from this method or till nothing was read from the input
519 * {@link ByteBuf}.
520 *
521 * @param ctx the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
522 * @param in the {@link ByteBuf} from which to read data
523 * @param out the {@link List} to which decoded messages should be added
524 * @throws Exception is thrown if an error occurs
525 */
526 final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
527 throws Exception {
528 decodeState = STATE_CALLING_CHILD_DECODE;
529 try {
530 decode(ctx, in, out);
531 } finally {
532 boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING;
533 decodeState = STATE_INIT;
534 if (removePending) {
535 fireChannelRead(ctx, out, out.size());
536 out.clear();
537 handlerRemoved(ctx);
538 }
539 }
540 }
541
542 /**
543 * Is called one last time when the {@link ChannelHandlerContext} goes in-active. Which means the
544 * {@link #channelInactive(ChannelHandlerContext)} was triggered.
545 *
546 * By default, this will just call {@link #decode(ChannelHandlerContext, ByteBuf, List)} but sub-classes may
547 * override this for some special cleanup operation.
548 */
549 protected void decodeLast(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
550 if (in.isReadable()) {
551 // Only call decode() if there is something left in the buffer to decode.
552 // See https://github.com/netty/netty/issues/4386
553 decodeRemovalReentryProtection(ctx, in, out);
554 }
555 }
556
557 static ByteBuf expandCumulation(ByteBufAllocator alloc, ByteBuf oldCumulation, ByteBuf in) {
558 int oldBytes = oldCumulation.readableBytes();
559 int newBytes = in.readableBytes();
560 int totalBytes = oldBytes + newBytes;
561 ByteBuf newCumulation = alloc.buffer(alloc.calculateNewCapacity(totalBytes, MAX_VALUE));
562 ByteBuf toRelease = newCumulation;
563 try {
564 // This avoids redundant checks and stack depth compared to calling writeBytes(...)
565 newCumulation.setBytes(0, oldCumulation, oldCumulation.readerIndex(), oldBytes)
566 .setBytes(oldBytes, in, in.readerIndex(), newBytes)
567 .writerIndex(totalBytes);
568 in.readerIndex(in.writerIndex());
569 toRelease = oldCumulation;
570 return newCumulation;
571 } finally {
572 toRelease.release();
573 }
574 }
575
576 /**
577 * Cumulate {@link ByteBuf}s.
578 */
579 public interface Cumulator {
580 /**
581 * Cumulate the given {@link ByteBuf}s and return the {@link ByteBuf} that holds the cumulated bytes.
582 * The implementation is responsible to correctly handle the life-cycle of the given {@link ByteBuf}s and so
583 * call {@link ByteBuf#release()} if a {@link ByteBuf} is fully consumed.
584 */
585 ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in);
586 }
587 }