1 /*
2 * Copyright 2012 The Netty Project
3 *
4 * The Netty Project licenses this file to you under the Apache License,
5 * version 2.0 (the "License"); you may not use this file except in compliance
6 * with the License. You may obtain a copy of the License at:
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations
14 * under the License.
15 */
16 package io.netty.handler.codec;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufAllocator;
20 import io.netty.buffer.CompositeByteBuf;
21 import io.netty.buffer.ReadOnlyByteBuf;
22 import io.netty.buffer.Unpooled;
23 import io.netty.channel.ChannelHandlerContext;
24 import io.netty.channel.ChannelInboundHandlerAdapter;
25 import io.netty.channel.socket.ChannelInputShutdownEvent;
26 import io.netty.util.internal.StringUtil;
27
28 import java.util.List;
29
30 /**
31 * {@link ChannelInboundHandlerAdapter} which decodes bytes in a stream-like fashion from one {@link ByteBuf} to an
32 * other Message type.
33 *
34 * For example here is an implementation which reads all readable bytes from
35 * the input {@link ByteBuf} and create a new {@link ByteBuf}.
36 *
37 * <pre>
38 * public class SquareDecoder extends {@link ByteToMessageDecoder} {
39 * {@code @Override}
40 * public void decode({@link ChannelHandlerContext} ctx, {@link ByteBuf} in, List<Object> out)
41 * throws {@link Exception} {
42 * out.add(in.readBytes(in.readableBytes()));
43 * }
44 * }
45 * </pre>
46 *
47 * <h3>Frame detection</h3>
48 * <p>
49 * Generally frame detection should be handled earlier in the pipeline by adding a
50 * {@link DelimiterBasedFrameDecoder}, {@link FixedLengthFrameDecoder}, {@link LengthFieldBasedFrameDecoder},
51 * or {@link LineBasedFrameDecoder}.
52 * <p>
53 * If a custom frame decoder is required, then one needs to be careful when implementing
54 * one with {@link ByteToMessageDecoder}. Ensure there are enough bytes in the buffer for a
55 * complete frame by checking {@link ByteBuf#readableBytes()}. If there are not enough bytes
56 * for a complete frame, return without modifying the reader index to allow more bytes to arrive.
57 * <p>
58 * To check for complete frames without modifying the reader index, use methods like {@link ByteBuf#getInt(int)}.
59 * One <strong>MUST</strong> use the reader index when using methods like {@link ByteBuf#getInt(int)}.
60 * For example calling <tt>in.getInt(0)</tt> is assuming the frame starts at the beginning of the buffer, which
61 * is not always the case. Use <tt>in.getInt(in.readerIndex())</tt> instead.
62 * <h3>Pitfalls</h3>
63 * <p>
64 * Be aware that sub-classes of {@link ByteToMessageDecoder} <strong>MUST NOT</strong>
65 * annotated with {@link @Sharable}.
66 * <p>
67 * Some methods such as {@link ByteBuf#readBytes(int)} will cause a memory leak if the returned buffer
68 * is not released or added to the <tt>out</tt> {@link List}. Use derived buffers like {@link ByteBuf#readSlice(int)}
69 * to avoid leaking memory.
70 */
71 public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter {
72
73 /**
74 * Cumulate {@link ByteBuf}s by merge them into one {@link ByteBuf}'s, using memory copies.
75 */
76 public static final Cumulator MERGE_CUMULATOR = new Cumulator() {
77 @SuppressWarnings("deprecation")
78 @Override
79 public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
80 final ByteBuf buffer;
81 if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes()
82 || cumulation.refCnt() > 1 || cumulation instanceof ReadOnlyByteBuf) {
83 // Expand cumulation (by replace it) when either there is not more room in the buffer
84 // or if the refCnt is greater then 1 which may happen when the user use slice().retain() or
85 // duplicate().retain() or if its read-only.
86 //
87 // See:
88 // - https://github.com/netty/netty/issues/2327
89 // - https://github.com/netty/netty/issues/1764
90 buffer = expandCumulation(alloc, cumulation, in.readableBytes());
91 } else {
92 buffer = cumulation;
93 }
94 buffer.writeBytes(in);
95 in.release();
96 return buffer;
97 }
98 };
99
100 /**
101 * Cumulate {@link ByteBuf}s by add them to a {@link CompositeByteBuf} and so do no memory copy whenever possible.
102 * Be aware that {@link CompositeByteBuf} use a more complex indexing implementation so depending on your use-case
103 * and the decoder implementation this may be slower then just use the {@link #MERGE_CUMULATOR}.
104 */
105 public static final Cumulator COMPOSITE_CUMULATOR = new Cumulator() {
106 @Override
107 public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
108 ByteBuf buffer;
109 if (cumulation.refCnt() > 1) {
110 // Expand cumulation (by replace it) when the refCnt is greater then 1 which may happen when the user
111 // use slice().retain() or duplicate().retain().
112 //
113 // See:
114 // - https://github.com/netty/netty/issues/2327
115 // - https://github.com/netty/netty/issues/1764
116 buffer = expandCumulation(alloc, cumulation, in.readableBytes());
117 buffer.writeBytes(in);
118 in.release();
119 } else {
120 CompositeByteBuf composite;
121 if (cumulation instanceof CompositeByteBuf) {
122 composite = (CompositeByteBuf) cumulation;
123 } else {
124 composite = alloc.compositeBuffer(Integer.MAX_VALUE);
125 composite.addComponent(true, cumulation);
126 }
127 composite.addComponent(true, in);
128 buffer = composite;
129 }
130 return buffer;
131 }
132 };
133
134 private static final byte STATE_INIT = 0;
135 private static final byte STATE_CALLING_CHILD_DECODE = 1;
136 private static final byte STATE_HANDLER_REMOVED_PENDING = 2;
137
138 ByteBuf cumulation;
139 private Cumulator cumulator = MERGE_CUMULATOR;
140 private boolean singleDecode;
141 private boolean decodeWasNull;
142 private boolean first;
143 /**
144 * A bitmask where the bits are defined as
145 * <ul>
146 * <li>{@link #STATE_INIT}</li>
147 * <li>{@link #STATE_CALLING_CHILD_DECODE}</li>
148 * <li>{@link #STATE_HANDLER_REMOVED_PENDING}</li>
149 * </ul>
150 */
151 private byte decodeState = STATE_INIT;
152 private int discardAfterReads = 16;
153 private int numReads;
154
155 protected ByteToMessageDecoder() {
156 ensureNotSharable();
157 }
158
159 /**
160 * If set then only one message is decoded on each {@link #channelRead(ChannelHandlerContext, Object)}
161 * call. This may be useful if you need to do some protocol upgrade and want to make sure nothing is mixed up.
162 *
163 * Default is {@code false} as this has performance impacts.
164 */
165 public void setSingleDecode(boolean singleDecode) {
166 this.singleDecode = singleDecode;
167 }
168
169 /**
170 * If {@code true} then only one message is decoded on each
171 * {@link #channelRead(ChannelHandlerContext, Object)} call.
172 *
173 * Default is {@code false} as this has performance impacts.
174 */
175 public boolean isSingleDecode() {
176 return singleDecode;
177 }
178
179 /**
180 * Set the {@link Cumulator} to use for cumulate the received {@link ByteBuf}s.
181 */
182 public void setCumulator(Cumulator cumulator) {
183 if (cumulator == null) {
184 throw new NullPointerException("cumulator");
185 }
186 this.cumulator = cumulator;
187 }
188
189 /**
190 * Set the number of reads after which {@link ByteBuf#discardSomeReadBytes()} are called and so free up memory.
191 * The default is {@code 16}.
192 */
193 public void setDiscardAfterReads(int discardAfterReads) {
194 if (discardAfterReads <= 0) {
195 throw new IllegalArgumentException("discardAfterReads must be > 0");
196 }
197 this.discardAfterReads = discardAfterReads;
198 }
199
200 /**
201 * Returns the actual number of readable bytes in the internal cumulative
202 * buffer of this decoder. You usually do not need to rely on this value
203 * to write a decoder. Use it only when you must use it at your own risk.
204 * This method is a shortcut to {@link #internalBuffer() internalBuffer().readableBytes()}.
205 */
206 protected int actualReadableBytes() {
207 return internalBuffer().readableBytes();
208 }
209
210 /**
211 * Returns the internal cumulative buffer of this decoder. You usually
212 * do not need to access the internal buffer directly to write a decoder.
213 * Use it only when you must use it at your own risk.
214 */
215 protected ByteBuf internalBuffer() {
216 if (cumulation != null) {
217 return cumulation;
218 } else {
219 return Unpooled.EMPTY_BUFFER;
220 }
221 }
222
223 @Override
224 public final void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
225 if (decodeState == STATE_CALLING_CHILD_DECODE) {
226 decodeState = STATE_HANDLER_REMOVED_PENDING;
227 return;
228 }
229 ByteBuf buf = cumulation;
230 if (buf != null) {
231 // Directly set this to null so we are sure we not access it in any other method here anymore.
232 cumulation = null;
233
234 int readable = buf.readableBytes();
235 if (readable > 0) {
236 ByteBuf bytes = buf.readBytes(readable);
237 buf.release();
238 ctx.fireChannelRead(bytes);
239 } else {
240 buf.release();
241 }
242
243 numReads = 0;
244 ctx.fireChannelReadComplete();
245 }
246 handlerRemoved0(ctx);
247 }
248
249 /**
250 * Gets called after the {@link ByteToMessageDecoder} was removed from the actual context and it doesn't handle
251 * events anymore.
252 */
253 protected void handlerRemoved0(ChannelHandlerContext ctx) throws Exception { }
254
255 @Override
256 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
257 if (msg instanceof ByteBuf) {
258 CodecOutputList out = CodecOutputList.newInstance();
259 try {
260 ByteBuf data = (ByteBuf) msg;
261 first = cumulation == null;
262 if (first) {
263 cumulation = data;
264 } else {
265 cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
266 }
267 callDecode(ctx, cumulation, out);
268 } catch (DecoderException e) {
269 throw e;
270 } catch (Exception e) {
271 throw new DecoderException(e);
272 } finally {
273 if (cumulation != null && !cumulation.isReadable()) {
274 numReads = 0;
275 cumulation.release();
276 cumulation = null;
277 } else if (++ numReads >= discardAfterReads) {
278 // We did enough reads already try to discard some bytes so we not risk to see a OOME.
279 // See https://github.com/netty/netty/issues/4275
280 numReads = 0;
281 discardSomeReadBytes();
282 }
283
284 int size = out.size();
285 decodeWasNull = !out.insertSinceRecycled();
286 fireChannelRead(ctx, out, size);
287 out.recycle();
288 }
289 } else {
290 ctx.fireChannelRead(msg);
291 }
292 }
293
294 /**
295 * Get {@code numElements} out of the {@link List} and forward these through the pipeline.
296 */
297 static void fireChannelRead(ChannelHandlerContext ctx, List<Object> msgs, int numElements) {
298 if (msgs instanceof CodecOutputList) {
299 fireChannelRead(ctx, (CodecOutputList) msgs, numElements);
300 } else {
301 for (int i = 0; i < numElements; i++) {
302 ctx.fireChannelRead(msgs.get(i));
303 }
304 }
305 }
306
307 /**
308 * Get {@code numElements} out of the {@link CodecOutputList} and forward these through the pipeline.
309 */
310 static void fireChannelRead(ChannelHandlerContext ctx, CodecOutputList msgs, int numElements) {
311 for (int i = 0; i < numElements; i ++) {
312 ctx.fireChannelRead(msgs.getUnsafe(i));
313 }
314 }
315
316 @Override
317 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
318 numReads = 0;
319 discardSomeReadBytes();
320 if (decodeWasNull) {
321 decodeWasNull = false;
322 if (!ctx.channel().config().isAutoRead()) {
323 ctx.read();
324 }
325 }
326 ctx.fireChannelReadComplete();
327 }
328
329 protected final void discardSomeReadBytes() {
330 if (cumulation != null && !first && cumulation.refCnt() == 1) {
331 // discard some bytes if possible to make more room in the
332 // buffer but only if the refCnt == 1 as otherwise the user may have
333 // used slice().retain() or duplicate().retain().
334 //
335 // See:
336 // - https://github.com/netty/netty/issues/2327
337 // - https://github.com/netty/netty/issues/1764
338 cumulation.discardSomeReadBytes();
339 }
340 }
341
342 @Override
343 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
344 channelInputClosed(ctx, true);
345 }
346
347 @Override
348 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
349 if (evt instanceof ChannelInputShutdownEvent) {
350 // The decodeLast method is invoked when a channelInactive event is encountered.
351 // This method is responsible for ending requests in some situations and must be called
352 // when the input has been shutdown.
353 channelInputClosed(ctx, false);
354 }
355 super.userEventTriggered(ctx, evt);
356 }
357
358 private void channelInputClosed(ChannelHandlerContext ctx, boolean callChannelInactive) throws Exception {
359 CodecOutputList out = CodecOutputList.newInstance();
360 try {
361 channelInputClosed(ctx, out);
362 } catch (DecoderException e) {
363 throw e;
364 } catch (Exception e) {
365 throw new DecoderException(e);
366 } finally {
367 try {
368 if (cumulation != null) {
369 cumulation.release();
370 cumulation = null;
371 }
372 int size = out.size();
373 fireChannelRead(ctx, out, size);
374 if (size > 0) {
375 // Something was read, call fireChannelReadComplete()
376 ctx.fireChannelReadComplete();
377 }
378 if (callChannelInactive) {
379 ctx.fireChannelInactive();
380 }
381 } finally {
382 // Recycle in all cases
383 out.recycle();
384 }
385 }
386 }
387
388 /**
389 * Called when the input of the channel was closed which may be because it changed to inactive or because of
390 * {@link ChannelInputShutdownEvent}.
391 */
392 void channelInputClosed(ChannelHandlerContext ctx, List<Object> out) throws Exception {
393 if (cumulation != null) {
394 callDecode(ctx, cumulation, out);
395 decodeLast(ctx, cumulation, out);
396 } else {
397 decodeLast(ctx, Unpooled.EMPTY_BUFFER, out);
398 }
399 }
400
401 /**
402 * Called once data should be decoded from the given {@link ByteBuf}. This method will call
403 * {@link #decode(ChannelHandlerContext, ByteBuf, List)} as long as decoding should take place.
404 *
405 * @param ctx the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
406 * @param in the {@link ByteBuf} from which to read data
407 * @param out the {@link List} to which decoded messages should be added
408 */
409 protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
410 try {
411 while (in.isReadable()) {
412 int outSize = out.size();
413
414 if (outSize > 0) {
415 fireChannelRead(ctx, out, outSize);
416 out.clear();
417
418 // Check if this handler was removed before continuing with decoding.
419 // If it was removed, it is not safe to continue to operate on the buffer.
420 //
421 // See:
422 // - https://github.com/netty/netty/issues/4635
423 if (ctx.isRemoved()) {
424 break;
425 }
426 outSize = 0;
427 }
428
429 int oldInputLength = in.readableBytes();
430 decodeRemovalReentryProtection(ctx, in, out);
431
432 // Check if this handler was removed before continuing the loop.
433 // If it was removed, it is not safe to continue to operate on the buffer.
434 //
435 // See https://github.com/netty/netty/issues/1664
436 if (ctx.isRemoved()) {
437 break;
438 }
439
440 if (outSize == out.size()) {
441 if (oldInputLength == in.readableBytes()) {
442 break;
443 } else {
444 continue;
445 }
446 }
447
448 if (oldInputLength == in.readableBytes()) {
449 throw new DecoderException(
450 StringUtil.simpleClassName(getClass()) +
451 ".decode() did not read anything but decoded a message.");
452 }
453
454 if (isSingleDecode()) {
455 break;
456 }
457 }
458 } catch (DecoderException e) {
459 throw e;
460 } catch (Exception cause) {
461 throw new DecoderException(cause);
462 }
463 }
464
465 /**
466 * Decode the from one {@link ByteBuf} to an other. This method will be called till either the input
467 * {@link ByteBuf} has nothing to read when return from this method or till nothing was read from the input
468 * {@link ByteBuf}.
469 *
470 * @param ctx the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
471 * @param in the {@link ByteBuf} from which to read data
472 * @param out the {@link List} to which decoded messages should be added
473 * @throws Exception is thrown if an error occurs
474 */
475 protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;
476
477 /**
478 * Decode the from one {@link ByteBuf} to an other. This method will be called till either the input
479 * {@link ByteBuf} has nothing to read when return from this method or till nothing was read from the input
480 * {@link ByteBuf}.
481 *
482 * @param ctx the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
483 * @param in the {@link ByteBuf} from which to read data
484 * @param out the {@link List} to which decoded messages should be added
485 * @throws Exception is thrown if an error occurs
486 */
487 final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
488 throws Exception {
489 decodeState = STATE_CALLING_CHILD_DECODE;
490 try {
491 decode(ctx, in, out);
492 } finally {
493 boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING;
494 decodeState = STATE_INIT;
495 if (removePending) {
496 handlerRemoved(ctx);
497 }
498 }
499 }
500
501 /**
502 * Is called one last time when the {@link ChannelHandlerContext} goes in-active. Which means the
503 * {@link #channelInactive(ChannelHandlerContext)} was triggered.
504 *
505 * By default this will just call {@link #decode(ChannelHandlerContext, ByteBuf, List)} but sub-classes may
506 * override this for some special cleanup operation.
507 */
508 protected void decodeLast(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
509 if (in.isReadable()) {
510 // Only call decode() if there is something left in the buffer to decode.
511 // See https://github.com/netty/netty/issues/4386
512 decodeRemovalReentryProtection(ctx, in, out);
513 }
514 }
515
516 static ByteBuf expandCumulation(ByteBufAllocator alloc, ByteBuf cumulation, int readable) {
517 ByteBuf oldCumulation = cumulation;
518 cumulation = alloc.buffer(oldCumulation.readableBytes() + readable);
519 cumulation.writeBytes(oldCumulation);
520 oldCumulation.release();
521 return cumulation;
522 }
523
524 /**
525 * Cumulate {@link ByteBuf}s.
526 */
527 public interface Cumulator {
528 /**
529 * Cumulate the given {@link ByteBuf}s and return the {@link ByteBuf} that holds the cumulated bytes.
530 * The implementation is responsible to correctly handle the life-cycle of the given {@link ByteBuf}s and so
531 * call {@link ByteBuf#release()} if a {@link ByteBuf} is fully consumed.
532 */
533 ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in);
534 }
535 }