1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package io.netty5.handler.codec;
16
17 import io.netty5.buffer.api.Buffer;
18 import io.netty5.buffer.api.BufferAllocator;
19 import io.netty5.buffer.api.CompositeBuffer;
20 import io.netty5.channel.ChannelHandler;
21 import io.netty5.channel.ChannelHandlerAdapter;
22 import io.netty5.channel.ChannelHandlerContext;
23 import io.netty5.channel.ChannelOption;
24 import io.netty5.channel.ChannelPipeline;
25 import io.netty5.channel.ChannelShutdownDirection;
26 import io.netty5.channel.internal.DelegatingChannelHandlerContext;
27 import io.netty5.util.Send;
28 import io.netty5.util.internal.StringUtil;
29
30 import java.util.Arrays;
31
32 import static io.netty5.util.internal.MathUtil.safeFindNextPositivePowerOfTwo;
33 import static java.util.Objects.requireNonNull;
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73 public abstract class ByteToMessageDecoder extends ChannelHandlerAdapter {
74
75
76
77
78 public static final Cumulator MERGE_CUMULATOR = new MergeCumulator();
79
80
81
82
83
84
85 public static final Cumulator COMPOSITE_CUMULATOR = new CompositeBufferCumulator();
86
87 private final int discardAfterReads = 16;
88 private final Cumulator cumulator;
89
90 private Buffer cumulation;
91 private boolean singleDecode;
92 private boolean first;
93
94
95
96
97 private boolean firedChannelRead;
98 private int numReads;
99 private ByteToMessageDecoderContext context;
100
101 protected ByteToMessageDecoder() {
102 this(MERGE_CUMULATOR);
103 }
104
105 protected ByteToMessageDecoder(Cumulator cumulator) {
106 this.cumulator = requireNonNull(cumulator, "cumulator");
107 }
108
109 @Override
110 public final boolean isSharable() {
111
112 return false;
113 }
114
115
116
117
118
119
120
121 public void setSingleDecode(boolean singleDecode) {
122 this.singleDecode = singleDecode;
123 }
124
125
126
127
128
129
130
131 public boolean isSingleDecode() {
132 return singleDecode;
133 }
134
135
136
137
138
139
140
141 protected int actualReadableBytes() {
142 return internalBuffer().readableBytes();
143 }
144
145
146
147
148
149
150
151
152 protected Buffer internalBuffer() {
153 return cumulation;
154 }
155
156 @Override
157 public final void handlerAdded(ChannelHandlerContext ctx) throws Exception {
158 context = new ByteToMessageDecoderContext(ctx);
159 handlerAdded0(context);
160 }
161
162 protected void handlerAdded0(ChannelHandlerContext ctx) throws Exception {
163 }
164
165 @Override
166 public final void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
167 Buffer buf = cumulation;
168 if (buf != null) {
169
170 cumulation = null;
171 numReads = 0;
172 int readable = buf.readableBytes();
173 if (readable > 0) {
174 ctx.fireChannelRead(buf);
175 ctx.fireChannelReadComplete();
176 } else {
177 buf.close();
178 }
179 }
180 handlerRemoved0(context);
181 }
182
183
184
185
186
187 protected void handlerRemoved0(ChannelHandlerContext ctx) throws Exception { }
188
189 @Override
190 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
191 if (msg instanceof Buffer) {
192 try {
193 Buffer data = (Buffer) msg;
194 first = cumulation == null;
195 if (first) {
196 cumulation = data;
197 } else {
198 cumulation = cumulator.cumulate(ctx.bufferAllocator(), cumulation, data);
199 }
200 assert context.delegatingCtx() == ctx || ctx == context;
201
202 callDecode(context, cumulation);
203 } catch (DecoderException e) {
204 throw e;
205 } catch (Exception e) {
206 throw new DecoderException(e);
207 } finally {
208 if (cumulation != null && cumulation.readableBytes() == 0) {
209 numReads = 0;
210 if (cumulation.isAccessible()) {
211 cumulation.close();
212 }
213 cumulation = null;
214 } else if (++numReads >= discardAfterReads) {
215
216
217 numReads = 0;
218 discardSomeReadBytes();
219 }
220
221 firedChannelRead |= context.fireChannelReadCallCount() > 0;
222 context.reset();
223 }
224 } else {
225 ctx.fireChannelRead(msg);
226 }
227 }
228
229 @Override
230 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
231 numReads = 0;
232 discardSomeReadBytes();
233 if (!firedChannelRead && !ctx.channel().getOption(ChannelOption.AUTO_READ)) {
234 ctx.read();
235 }
236 firedChannelRead = false;
237 ctx.fireChannelReadComplete();
238 }
239
240 protected final void discardSomeReadBytes() {
241 if (cumulation != null && !first) {
242
243 cumulator.discardSomeReadBytes(cumulation);
244 }
245 }
246
247 @Override
248 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
249 assert context.delegatingCtx() == ctx || ctx == context;
250 channelInputClosed(context, true);
251 }
252
253 @Override
254 public void channelShutdown(ChannelHandlerContext ctx, ChannelShutdownDirection direction) throws Exception {
255 ctx.fireChannelShutdown(direction);
256 if (direction == ChannelShutdownDirection.Inbound) {
257
258
259
260 assert context.delegatingCtx() == ctx || ctx == context;
261 channelInputClosed(context, false);
262 }
263 }
264
265 private void channelInputClosed(ByteToMessageDecoderContext ctx, boolean callChannelInactive) {
266 try {
267 channelInputClosed(ctx);
268 } catch (DecoderException e) {
269 throw e;
270 } catch (Exception e) {
271 throw new DecoderException(e);
272 } finally {
273 if (cumulation != null) {
274 cumulation.close();
275 cumulation = null;
276 }
277 if (ctx.fireChannelReadCallCount() > 0) {
278 ctx.reset();
279
280 ctx.fireChannelReadComplete();
281 }
282 if (callChannelInactive) {
283 ctx.fireChannelInactive();
284 }
285 }
286 }
287
288
289
290
291
292 void channelInputClosed(ByteToMessageDecoderContext ctx) throws Exception {
293 if (cumulation != null) {
294 callDecode(ctx, cumulation);
295
296
297 if (!ctx.isRemoved()) {
298
299
300 Buffer buffer = cumulation == null ? ctx.bufferAllocator().allocate(0) : cumulation;
301 decodeLast(ctx, buffer);
302 }
303 } else {
304 decodeLast(ctx, ctx.bufferAllocator().allocate(0));
305 }
306 }
307
308
309
310
311
312
313
314
315 void callDecode(ByteToMessageDecoderContext ctx, Buffer in) {
316 try {
317 while (in.readableBytes() > 0 && !ctx.isRemoved()) {
318
319 int oldInputLength = in.readableBytes();
320 int numReadCalled = ctx.fireChannelReadCallCount();
321 decodeRemovalReentryProtection(ctx, in);
322
323
324
325
326
327 if (ctx.isRemoved()) {
328 break;
329 }
330
331 if (numReadCalled == ctx.fireChannelReadCallCount()) {
332 if (oldInputLength == in.readableBytes()) {
333 break;
334 } else {
335 continue;
336 }
337 }
338
339 if (oldInputLength == in.readableBytes()) {
340 throw new DecoderException(
341 StringUtil.simpleClassName(getClass()) +
342 ".decode() did not read anything but decoded a message.");
343 }
344
345 if (isSingleDecode()) {
346 break;
347 }
348 }
349 } catch (DecoderException e) {
350 throw e;
351 } catch (Exception cause) {
352 throw new DecoderException(cause);
353 }
354 }
355
356
357
358
359
360
361
362
363
364
365 protected abstract void decode(ChannelHandlerContext ctx, Buffer in) throws Exception;
366
367
368
369
370
371
372
373
374
375
376 final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, Buffer in)
377 throws Exception {
378 decode(ctx, in);
379 }
380
381
382
383
384
385
386
387
388 protected void decodeLast(ChannelHandlerContext ctx, Buffer in) throws Exception {
389 if (in.readableBytes() > 0) {
390
391
392 decodeRemovalReentryProtection(ctx, in);
393 }
394 }
395
396
397
398
399 public interface Cumulator {
400
401
402
403
404
405 Buffer cumulate(BufferAllocator alloc, Buffer cumulation, Buffer in);
406
407
408
409
410
411
412
413
414
415 Buffer discardSomeReadBytes(Buffer cumulation);
416 }
417
418
419 static final class ByteToMessageDecoderContext extends DelegatingChannelHandlerContext {
420 private int fireChannelReadCalled;
421
422 private ByteToMessageDecoderContext(ChannelHandlerContext ctx) {
423 super(ctx);
424 }
425
426 void reset() {
427 fireChannelReadCalled = 0;
428 }
429
430 int fireChannelReadCallCount() {
431 return fireChannelReadCalled;
432 }
433
434 @Override
435 public ChannelHandlerContext fireChannelRead(Object msg) {
436 fireChannelReadCalled ++;
437 super.fireChannelRead(msg);
438 return this;
439 }
440 }
441
442 private static final class CompositeBufferCumulator implements Cumulator {
443 @Override
444 public Buffer cumulate(BufferAllocator alloc, Buffer cumulation, Buffer in) {
445 if (cumulation.readableBytes() == 0) {
446 cumulation.close();
447 return in;
448 }
449 try (in) {
450 if (in.readableBytes() == 0) {
451 return cumulation;
452 }
453 if (cumulation.readOnly()) {
454 Buffer tmp = cumulation.copy();
455 cumulation.close();
456 cumulation = tmp;
457 }
458 if (CompositeBuffer.isComposite(cumulation)) {
459 CompositeBuffer composite = (CompositeBuffer) cumulation;
460 composite.extendWith(prepareInForCompose(in));
461 return composite;
462 }
463 return alloc.compose(Arrays.asList(cumulation.send(), prepareInForCompose(in)));
464 }
465 }
466
467 private static Send<Buffer> prepareInForCompose(Buffer in) {
468 return in.readOnly() ? in.copy().send() : in.send();
469 }
470
471 @Override
472 public Buffer discardSomeReadBytes(Buffer cumulation) {
473
474
475
476 cumulation.readSplit(0).close();
477 return cumulation;
478 }
479
480 @Override
481 public String toString() {
482 return "CompositeBufferCumulator";
483 }
484 }
485
486 private static final class MergeCumulator implements Cumulator {
487 @Override
488 public Buffer cumulate(BufferAllocator alloc, Buffer cumulation, Buffer in) {
489 if (cumulation.readableBytes() == 0) {
490
491 cumulation.close();
492 return in;
493 }
494
495
496 try (in) {
497 final int required = in.readableBytes();
498 if (required > cumulation.writableBytes() || cumulation.readOnly()) {
499 return expandCumulationAndWrite(alloc, cumulation, in);
500 }
501 cumulation.writeBytes(in);
502 return cumulation;
503 }
504 }
505
506 @Override
507 public Buffer discardSomeReadBytes(Buffer cumulation) {
508 if (cumulation.readerOffset() > cumulation.writableBytes()) {
509 cumulation.compact();
510 }
511 return cumulation;
512 }
513
514 private static Buffer expandCumulationAndWrite(BufferAllocator alloc, Buffer oldCumulation, Buffer in) {
515 final int newSize = safeFindNextPositivePowerOfTwo(oldCumulation.readableBytes() + in.readableBytes());
516 Buffer newCumulation = oldCumulation.readOnly() ? alloc.allocate(newSize) :
517 oldCumulation.ensureWritable(newSize);
518 try {
519 if (newCumulation != oldCumulation) {
520 newCumulation.writeBytes(oldCumulation);
521 }
522 newCumulation.writeBytes(in);
523 return newCumulation;
524 } finally {
525 if (newCumulation != oldCumulation) {
526 oldCumulation.close();
527 }
528 }
529 }
530
531 @Override
532 public String toString() {
533 return "MergeCumulator";
534 }
535 }
536 }