1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.codec.redis;
17
18 import io.netty.channel.ChannelHandlerContext;
19 import io.netty.handler.codec.CodecException;
20 import io.netty.handler.codec.MessageToMessageDecoder;
21 import io.netty.handler.codec.PrematureChannelClosureException;
22 import io.netty.util.ReferenceCountUtil;
23 import io.netty.util.internal.ObjectUtil;
24 import io.netty.util.internal.UnstableApi;
25
26 import java.util.ArrayDeque;
27 import java.util.ArrayList;
28 import java.util.Deque;
29 import java.util.List;
30
31
32
33
34
35 @UnstableApi
36 public final class RedisArrayAggregator extends MessageToMessageDecoder<RedisMessage> {
37
38 private static final int DEFAULT_MAX_ARRAY_LENGTH = RedisConstants.REDIS_MAX_ARRAY_LENGTH;
39 private final int maxNestedArrayDepth;
40 private final Deque<AggregateState> depths = new ArrayDeque<AggregateState>(4);
41 private final int maxElements;
42
43
44
45
46
47
48
49
50
51
52 @Deprecated
53 public RedisArrayAggregator() {
54
55 this(DEFAULT_MAX_ARRAY_LENGTH, 1024);
56 }
57
58
59
60
61
62
63
64
65
66
67 public RedisArrayAggregator(int maxElements, int maxNestedArrayDepth) {
68 super(RedisMessage.class);
69 this.maxElements = ObjectUtil.checkPositive(maxElements, "maxElements");
70 this.maxNestedArrayDepth = ObjectUtil.checkPositive(maxNestedArrayDepth, "maxNestedArrayDepth");
71 }
72
73 @Override
74 protected void decode(ChannelHandlerContext ctx, RedisMessage msg, List<Object> out) throws Exception {
75 if (msg instanceof ArrayHeaderRedisMessage) {
76 msg = decodeRedisArrayHeader((ArrayHeaderRedisMessage) msg);
77 if (msg == null) {
78 return;
79 }
80 } else {
81 ReferenceCountUtil.retain(msg);
82 }
83
84 while (!depths.isEmpty()) {
85 AggregateState current = depths.peek();
86 current.children.add(msg);
87
88
89 if (current.children.size() == current.length) {
90 msg = new ArrayRedisMessage(current.children);
91 depths.pop();
92 } else {
93
94 return;
95 }
96 }
97
98 out.add(msg);
99 }
100
101 private RedisMessage decodeRedisArrayHeader(ArrayHeaderRedisMessage header) {
102 if (header.isNull()) {
103 return ArrayRedisMessage.NULL_INSTANCE;
104 } else if (header.length() == 0L) {
105 return ArrayRedisMessage.EMPTY_INSTANCE;
106 } else if (header.length() > 0L) {
107
108 if (header.length() > maxElements) {
109 throw new CodecException("this codec doesn't support longer length than " + maxElements);
110 }
111
112 if (depths.size() >= maxNestedArrayDepth) {
113 releaseAndClearDepths();
114 throw new CodecException("max nested array depth exceeded: " + maxNestedArrayDepth);
115 }
116
117 depths.push(new AggregateState((int) header.length()));
118 return null;
119 } else {
120 throw new CodecException("bad length: " + header.length());
121 }
122 }
123
124 private static final class AggregateState {
125 private final int length;
126 private final List<RedisMessage> children;
127 AggregateState(int length) {
128 this.length = length;
129 this.children = new ArrayList<RedisMessage>(length);
130 }
131 }
132
133 @Override
134 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
135 super.handlerRemoved(ctx);
136 releaseAndClearDepths();
137 }
138
139 private void releaseAndClearDepths() {
140 for (AggregateState state : depths) {
141 for (RedisMessage message : state.children) {
142 ReferenceCountUtil.safeRelease(message);
143 }
144 }
145 depths.clear();
146 }
147
148 @Override
149 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
150 super.channelInactive(ctx);
151
152 if (!depths.isEmpty()) {
153 ctx.fireExceptionCaught(new PrematureChannelClosureException(
154 "channel gone inactive with " + depths.size() +
155 " messages still incomplete"));
156 }
157 }
158 }