1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.codec;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufAllocator;
20 import io.netty.buffer.CompositeByteBuf;
21 import io.netty.buffer.ReadOnlyByteBuf;
22 import io.netty.buffer.Unpooled;
23 import io.netty.channel.ChannelHandlerContext;
24 import io.netty.channel.ChannelInboundHandlerAdapter;
25 import io.netty.channel.socket.ChannelInputShutdownEvent;
26 import io.netty.util.internal.StringUtil;
27
28 import java.util.List;
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71 public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter {
72
73
74
75
76 public static final Cumulator MERGE_CUMULATOR = new Cumulator() {
77 @SuppressWarnings("deprecation")
78 @Override
79 public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
80 final ByteBuf buffer;
81 if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes()
82 || cumulation.refCnt() > 1 || cumulation instanceof ReadOnlyByteBuf) {
83
84
85
86
87
88
89
90 buffer = expandCumulation(alloc, cumulation, in.readableBytes());
91 } else {
92 buffer = cumulation;
93 }
94 buffer.writeBytes(in);
95 in.release();
96 return buffer;
97 }
98 };
99
100
101
102
103
104
105 public static final Cumulator COMPOSITE_CUMULATOR = new Cumulator() {
106 @Override
107 public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
108 ByteBuf buffer;
109 if (cumulation.refCnt() > 1) {
110
111
112
113
114
115
116 buffer = expandCumulation(alloc, cumulation, in.readableBytes());
117 buffer.writeBytes(in);
118 in.release();
119 } else {
120 CompositeByteBuf composite;
121 if (cumulation instanceof CompositeByteBuf) {
122 composite = (CompositeByteBuf) cumulation;
123 } else {
124 composite = alloc.compositeBuffer(Integer.MAX_VALUE);
125 composite.addComponent(true, cumulation);
126 }
127 composite.addComponent(true, in);
128 buffer = composite;
129 }
130 return buffer;
131 }
132 };
133
134 private static final byte STATE_INIT = 0;
135 private static final byte STATE_CALLING_CHILD_DECODE = 1;
136 private static final byte STATE_HANDLER_REMOVED_PENDING = 2;
137
138 ByteBuf cumulation;
139 private Cumulator cumulator = MERGE_CUMULATOR;
140 private boolean singleDecode;
141 private boolean decodeWasNull;
142 private boolean first;
143
144
145
146
147
148
149
150
151 private byte decodeState = STATE_INIT;
152 private int discardAfterReads = 16;
153 private int numReads;
154
155 protected ByteToMessageDecoder() {
156 ensureNotSharable();
157 }
158
159
160
161
162
163
164
165 public void setSingleDecode(boolean singleDecode) {
166 this.singleDecode = singleDecode;
167 }
168
169
170
171
172
173
174
175 public boolean isSingleDecode() {
176 return singleDecode;
177 }
178
179
180
181
182 public void setCumulator(Cumulator cumulator) {
183 if (cumulator == null) {
184 throw new NullPointerException("cumulator");
185 }
186 this.cumulator = cumulator;
187 }
188
189
190
191
192
193 public void setDiscardAfterReads(int discardAfterReads) {
194 if (discardAfterReads <= 0) {
195 throw new IllegalArgumentException("discardAfterReads must be > 0");
196 }
197 this.discardAfterReads = discardAfterReads;
198 }
199
200
201
202
203
204
205
206 protected int actualReadableBytes() {
207 return internalBuffer().readableBytes();
208 }
209
210
211
212
213
214
215 protected ByteBuf internalBuffer() {
216 if (cumulation != null) {
217 return cumulation;
218 } else {
219 return Unpooled.EMPTY_BUFFER;
220 }
221 }
222
223 @Override
224 public final void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
225 if (decodeState == STATE_CALLING_CHILD_DECODE) {
226 decodeState = STATE_HANDLER_REMOVED_PENDING;
227 return;
228 }
229 ByteBuf buf = cumulation;
230 if (buf != null) {
231
232 cumulation = null;
233
234 int readable = buf.readableBytes();
235 if (readable > 0) {
236 ByteBuf bytes = buf.readBytes(readable);
237 buf.release();
238 ctx.fireChannelRead(bytes);
239 } else {
240 buf.release();
241 }
242
243 numReads = 0;
244 ctx.fireChannelReadComplete();
245 }
246 handlerRemoved0(ctx);
247 }
248
249
250
251
252
253 protected void handlerRemoved0(ChannelHandlerContext ctx) throws Exception { }
254
255 @Override
256 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
257 if (msg instanceof ByteBuf) {
258 CodecOutputList out = CodecOutputList.newInstance();
259 try {
260 ByteBuf data = (ByteBuf) msg;
261 first = cumulation == null;
262 if (first) {
263 cumulation = data;
264 } else {
265 cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
266 }
267 callDecode(ctx, cumulation, out);
268 } catch (DecoderException e) {
269 throw e;
270 } catch (Exception e) {
271 throw new DecoderException(e);
272 } finally {
273 if (cumulation != null && !cumulation.isReadable()) {
274 numReads = 0;
275 cumulation.release();
276 cumulation = null;
277 } else if (++ numReads >= discardAfterReads) {
278
279
280 numReads = 0;
281 discardSomeReadBytes();
282 }
283
284 int size = out.size();
285 decodeWasNull = !out.insertSinceRecycled();
286 fireChannelRead(ctx, out, size);
287 out.recycle();
288 }
289 } else {
290 ctx.fireChannelRead(msg);
291 }
292 }
293
294
295
296
297 static void fireChannelRead(ChannelHandlerContext ctx, List<Object> msgs, int numElements) {
298 if (msgs instanceof CodecOutputList) {
299 fireChannelRead(ctx, (CodecOutputList) msgs, numElements);
300 } else {
301 for (int i = 0; i < numElements; i++) {
302 ctx.fireChannelRead(msgs.get(i));
303 }
304 }
305 }
306
307
308
309
310 static void fireChannelRead(ChannelHandlerContext ctx, CodecOutputList msgs, int numElements) {
311 for (int i = 0; i < numElements; i ++) {
312 ctx.fireChannelRead(msgs.getUnsafe(i));
313 }
314 }
315
316 @Override
317 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
318 numReads = 0;
319 discardSomeReadBytes();
320 if (decodeWasNull) {
321 decodeWasNull = false;
322 if (!ctx.channel().config().isAutoRead()) {
323 ctx.read();
324 }
325 }
326 ctx.fireChannelReadComplete();
327 }
328
329 protected final void discardSomeReadBytes() {
330 if (cumulation != null && !first && cumulation.refCnt() == 1) {
331
332
333
334
335
336
337
338 cumulation.discardSomeReadBytes();
339 }
340 }
341
342 @Override
343 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
344 channelInputClosed(ctx, true);
345 }
346
347 @Override
348 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
349 if (evt instanceof ChannelInputShutdownEvent) {
350
351
352
353 channelInputClosed(ctx, false);
354 }
355 super.userEventTriggered(ctx, evt);
356 }
357
358 private void channelInputClosed(ChannelHandlerContext ctx, boolean callChannelInactive) throws Exception {
359 CodecOutputList out = CodecOutputList.newInstance();
360 try {
361 channelInputClosed(ctx, out);
362 } catch (DecoderException e) {
363 throw e;
364 } catch (Exception e) {
365 throw new DecoderException(e);
366 } finally {
367 try {
368 if (cumulation != null) {
369 cumulation.release();
370 cumulation = null;
371 }
372 int size = out.size();
373 fireChannelRead(ctx, out, size);
374 if (size > 0) {
375
376 ctx.fireChannelReadComplete();
377 }
378 if (callChannelInactive) {
379 ctx.fireChannelInactive();
380 }
381 } finally {
382
383 out.recycle();
384 }
385 }
386 }
387
388
389
390
391
392 void channelInputClosed(ChannelHandlerContext ctx, List<Object> out) throws Exception {
393 if (cumulation != null) {
394 callDecode(ctx, cumulation, out);
395 decodeLast(ctx, cumulation, out);
396 } else {
397 decodeLast(ctx, Unpooled.EMPTY_BUFFER, out);
398 }
399 }
400
401
402
403
404
405
406
407
408
409 protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
410 try {
411 while (in.isReadable()) {
412 int outSize = out.size();
413
414 if (outSize > 0) {
415 fireChannelRead(ctx, out, outSize);
416 out.clear();
417
418
419
420
421
422
423 if (ctx.isRemoved()) {
424 break;
425 }
426 outSize = 0;
427 }
428
429 int oldInputLength = in.readableBytes();
430 decodeRemovalReentryProtection(ctx, in, out);
431
432
433
434
435
436 if (ctx.isRemoved()) {
437 break;
438 }
439
440 if (outSize == out.size()) {
441 if (oldInputLength == in.readableBytes()) {
442 break;
443 } else {
444 continue;
445 }
446 }
447
448 if (oldInputLength == in.readableBytes()) {
449 throw new DecoderException(
450 StringUtil.simpleClassName(getClass()) +
451 ".decode() did not read anything but decoded a message.");
452 }
453
454 if (isSingleDecode()) {
455 break;
456 }
457 }
458 } catch (DecoderException e) {
459 throw e;
460 } catch (Exception cause) {
461 throw new DecoderException(cause);
462 }
463 }
464
465
466
467
468
469
470
471
472
473
474
475 protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;
476
477
478
479
480
481
482
483
484
485
486
487 final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
488 throws Exception {
489 decodeState = STATE_CALLING_CHILD_DECODE;
490 try {
491 decode(ctx, in, out);
492 } finally {
493 boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING;
494 decodeState = STATE_INIT;
495 if (removePending) {
496 handlerRemoved(ctx);
497 }
498 }
499 }
500
501
502
503
504
505
506
507
508 protected void decodeLast(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
509 if (in.isReadable()) {
510
511
512 decodeRemovalReentryProtection(ctx, in, out);
513 }
514 }
515
516 static ByteBuf expandCumulation(ByteBufAllocator alloc, ByteBuf cumulation, int readable) {
517 ByteBuf oldCumulation = cumulation;
518 cumulation = alloc.buffer(oldCumulation.readableBytes() + readable);
519 cumulation.writeBytes(oldCumulation);
520 oldCumulation.release();
521 return cumulation;
522 }
523
524
525
526
527 public interface Cumulator {
528
529
530
531
532
533 ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in);
534 }
535 }