1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.codec.redis;
17
18 import io.netty.channel.ChannelHandlerContext;
19 import io.netty.handler.codec.CodecException;
20 import io.netty.handler.codec.MessageToMessageDecoder;
21 import io.netty.util.ReferenceCountUtil;
22 import io.netty.util.internal.UnstableApi;
23
24 import java.util.ArrayDeque;
25 import java.util.ArrayList;
26 import java.util.Deque;
27 import java.util.List;
28
29
30
31
32
33 @UnstableApi
34 public final class RedisArrayAggregator extends MessageToMessageDecoder<RedisMessage> {
35
36 private final Deque<AggregateState> depths = new ArrayDeque<AggregateState>(4);
37
38 public RedisArrayAggregator() {
39 super(RedisMessage.class);
40 }
41
42 @Override
43 protected void decode(ChannelHandlerContext ctx, RedisMessage msg, List<Object> out) throws Exception {
44 if (msg instanceof ArrayHeaderRedisMessage) {
45 msg = decodeRedisArrayHeader((ArrayHeaderRedisMessage) msg);
46 if (msg == null) {
47 return;
48 }
49 } else {
50 ReferenceCountUtil.retain(msg);
51 }
52
53 while (!depths.isEmpty()) {
54 AggregateState current = depths.peek();
55 current.children.add(msg);
56
57
58 if (current.children.size() == current.length) {
59 msg = new ArrayRedisMessage(current.children);
60 depths.pop();
61 } else {
62
63 return;
64 }
65 }
66
67 out.add(msg);
68 }
69
70 private RedisMessage decodeRedisArrayHeader(ArrayHeaderRedisMessage header) {
71 if (header.isNull()) {
72 return ArrayRedisMessage.NULL_INSTANCE;
73 } else if (header.length() == 0L) {
74 return ArrayRedisMessage.EMPTY_INSTANCE;
75 } else if (header.length() > 0L) {
76
77 if (header.length() > Integer.MAX_VALUE) {
78 throw new CodecException("this codec doesn't support longer length than " + Integer.MAX_VALUE);
79 }
80
81
82 depths.push(new AggregateState((int) header.length()));
83 return null;
84 } else {
85 throw new CodecException("bad length: " + header.length());
86 }
87 }
88
89 private static final class AggregateState {
90 private final int length;
91 private final List<RedisMessage> children;
92 AggregateState(int length) {
93 this.length = length;
94 this.children = new ArrayList<RedisMessage>(length);
95 }
96 }
97 }