1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.codec.redis;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.channel.ChannelHandlerContext;
20 import io.netty.handler.codec.ByteToMessageDecoder;
21 import io.netty.util.ByteProcessor;
22 import io.netty.util.CharsetUtil;
23 import io.netty.util.internal.UnstableApi;
24
25 import java.util.List;
26
27
28
29
30
31
32
33
34 @UnstableApi
35 public final class RedisDecoder extends ByteToMessageDecoder {
36
37 private final ToPositiveLongProcessor toPositiveLongProcessor = new ToPositiveLongProcessor();
38
39 private final boolean decodeInlineCommands;
40 private final int maxInlineMessageLength;
41 private final RedisMessagePool messagePool;
42
43
44 private State state = State.DECODE_TYPE;
45 private RedisMessageType type;
46 private int remainingBulkLength;
47
48 private enum State {
49 DECODE_TYPE,
50 DECODE_INLINE,
51 DECODE_LENGTH,
52 DECODE_BULK_STRING_EOL,
53 DECODE_BULK_STRING_CONTENT,
54 }
55
56
57
58
59
60 public RedisDecoder() {
61 this(false);
62 }
63
64
65
66
67
68 public RedisDecoder(boolean decodeInlineCommands) {
69 this(RedisConstants.REDIS_INLINE_MESSAGE_MAX_LENGTH, FixedRedisMessagePool.INSTANCE, decodeInlineCommands);
70 }
71
72
73
74
75
76
77 public RedisDecoder(int maxInlineMessageLength, RedisMessagePool messagePool) {
78 this(maxInlineMessageLength, messagePool, false);
79 }
80
81
82
83
84
85
86
87 public RedisDecoder(int maxInlineMessageLength, RedisMessagePool messagePool, boolean decodeInlineCommands) {
88 if (maxInlineMessageLength <= 0 || maxInlineMessageLength > RedisConstants.REDIS_MESSAGE_MAX_LENGTH) {
89 throw new RedisCodecException("maxInlineMessageLength: " + maxInlineMessageLength +
90 " (expected: <= " + RedisConstants.REDIS_MESSAGE_MAX_LENGTH + ")");
91 }
92 this.maxInlineMessageLength = maxInlineMessageLength;
93 this.messagePool = messagePool;
94 this.decodeInlineCommands = decodeInlineCommands;
95 }
96
97 @Override
98 protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
99 try {
100 for (;;) {
101 switch (state) {
102 case DECODE_TYPE:
103 if (!decodeType(in)) {
104 return;
105 }
106 break;
107 case DECODE_INLINE:
108 if (!decodeInline(in, out)) {
109 return;
110 }
111 break;
112 case DECODE_LENGTH:
113 if (!decodeLength(in, out)) {
114 return;
115 }
116 break;
117 case DECODE_BULK_STRING_EOL:
118 if (!decodeBulkStringEndOfLine(in, out)) {
119 return;
120 }
121 break;
122 case DECODE_BULK_STRING_CONTENT:
123 if (!decodeBulkStringContent(in, out)) {
124 return;
125 }
126 break;
127 default:
128 throw new RedisCodecException("Unknown state: " + state);
129 }
130 }
131 } catch (RedisCodecException e) {
132
133 in.skipBytes(in.readableBytes());
134 resetDecoder();
135 throw e;
136 } catch (Exception e) {
137
138 in.skipBytes(in.readableBytes());
139 resetDecoder();
140 throw new RedisCodecException(e);
141 }
142 }
143
144 private void resetDecoder() {
145 state = State.DECODE_TYPE;
146 remainingBulkLength = 0;
147 }
148
149 private boolean decodeType(ByteBuf in) throws Exception {
150 if (!in.isReadable()) {
151 return false;
152 }
153
154 type = RedisMessageType.readFrom(in, decodeInlineCommands);
155 state = type.isInline() ? State.DECODE_INLINE : State.DECODE_LENGTH;
156 return true;
157 }
158
159 private boolean decodeInline(ByteBuf in, List<Object> out) throws Exception {
160 ByteBuf lineBytes = readLine(in);
161 if (lineBytes == null) {
162 if (in.readableBytes() > maxInlineMessageLength) {
163 throw new RedisCodecException("length: " + in.readableBytes() +
164 " (expected: <= " + maxInlineMessageLength + ")");
165 }
166 return false;
167 }
168 out.add(newInlineRedisMessage(type, lineBytes));
169 resetDecoder();
170 return true;
171 }
172
173 private boolean decodeLength(ByteBuf in, List<Object> out) throws Exception {
174 ByteBuf lineByteBuf = readLine(in);
175 if (lineByteBuf == null) {
176 int readableBytes = in.readableBytes();
177 if (readableBytes <= RedisConstants.POSITIVE_LONG_MAX_LENGTH) {
178
179 return false;
180 }
181 boolean isNegative = in.getByte(in.readerIndex()) == '-';
182 int capacity = RedisConstants.POSITIVE_LONG_MAX_LENGTH + (isNegative ? 1 : 0) + 1;
183 if (readableBytes > capacity) {
184 throw new RedisCodecException("too many characters to be a valid RESP Integer: " + readableBytes);
185 }
186 return false;
187 }
188 final long length = parseRedisNumber(lineByteBuf);
189 if (length < RedisConstants.NULL_VALUE) {
190 throw new RedisCodecException("length: " + length + " (expected: >= " + RedisConstants.NULL_VALUE + ")");
191 }
192 switch (type) {
193 case ARRAY_HEADER:
194 out.add(new ArrayHeaderRedisMessage(length));
195 resetDecoder();
196 return true;
197 case BULK_STRING:
198 if (length > RedisConstants.REDIS_MESSAGE_MAX_LENGTH) {
199 throw new RedisCodecException("length: " + length + " (expected: <= " +
200 RedisConstants.REDIS_MESSAGE_MAX_LENGTH + ")");
201 }
202 remainingBulkLength = (int) length;
203 return decodeBulkString(in, out);
204 default:
205 throw new RedisCodecException("bad type: " + type);
206 }
207 }
208
209 private boolean decodeBulkString(ByteBuf in, List<Object> out) throws Exception {
210 switch (remainingBulkLength) {
211 case RedisConstants.NULL_VALUE:
212 out.add(FullBulkStringRedisMessage.NULL_INSTANCE);
213 resetDecoder();
214 return true;
215 case 0:
216 state = State.DECODE_BULK_STRING_EOL;
217 return decodeBulkStringEndOfLine(in, out);
218 default:
219 out.add(new BulkStringHeaderRedisMessage(remainingBulkLength));
220 state = State.DECODE_BULK_STRING_CONTENT;
221 return decodeBulkStringContent(in, out);
222 }
223 }
224
225
226 private boolean decodeBulkStringEndOfLine(ByteBuf in, List<Object> out) throws Exception {
227 if (in.readableBytes() < RedisConstants.EOL_LENGTH) {
228 return false;
229 }
230 readEndOfLine(in);
231 out.add(FullBulkStringRedisMessage.EMPTY_INSTANCE);
232 resetDecoder();
233 return true;
234 }
235
236
237 private boolean decodeBulkStringContent(ByteBuf in, List<Object> out) throws Exception {
238 final int readableBytes = in.readableBytes();
239 if (readableBytes == 0 || remainingBulkLength == 0 && readableBytes < RedisConstants.EOL_LENGTH) {
240 return false;
241 }
242
243
244 if (readableBytes >= remainingBulkLength + RedisConstants.EOL_LENGTH) {
245 ByteBuf content = in.readSlice(remainingBulkLength);
246 readEndOfLine(in);
247
248 out.add(new DefaultLastBulkStringRedisContent(content.retain()));
249 resetDecoder();
250 return true;
251 }
252
253
254 int toRead = Math.min(remainingBulkLength, readableBytes);
255 remainingBulkLength -= toRead;
256 out.add(new DefaultBulkStringRedisContent(in.readSlice(toRead).retain()));
257 return true;
258 }
259
260 private static void readEndOfLine(final ByteBuf in) {
261 final short delim = in.readShort();
262 if (RedisConstants.EOL_SHORT == delim) {
263 return;
264 }
265 final byte[] bytes = RedisCodecUtil.shortToBytes(delim);
266 throw new RedisCodecException("delimiter: [" + bytes[0] + "," + bytes[1] + "] (expected: \\r\\n)");
267 }
268
269 private RedisMessage newInlineRedisMessage(RedisMessageType messageType, ByteBuf content) {
270 switch (messageType) {
271 case INLINE_COMMAND:
272 return new InlineCommandRedisMessage(content.toString(CharsetUtil.UTF_8));
273 case SIMPLE_STRING: {
274 SimpleStringRedisMessage cached = messagePool.getSimpleString(content);
275 return cached != null ? cached : new SimpleStringRedisMessage(content.toString(CharsetUtil.UTF_8));
276 }
277 case ERROR: {
278 ErrorRedisMessage cached = messagePool.getError(content);
279 return cached != null ? cached : new ErrorRedisMessage(content.toString(CharsetUtil.UTF_8));
280 }
281 case INTEGER: {
282 IntegerRedisMessage cached = messagePool.getInteger(content);
283 return cached != null ? cached : new IntegerRedisMessage(parseRedisNumber(content));
284 }
285 default:
286 throw new RedisCodecException("bad type: " + messageType);
287 }
288 }
289
290 private static ByteBuf readLine(ByteBuf in) {
291 if (!in.isReadable(RedisConstants.EOL_LENGTH)) {
292 return null;
293 }
294 final int lfIndex = in.indexOf(in.readerIndex(), in.writerIndex(), (byte) '\n');
295 if (lfIndex < 0) {
296 return null;
297 }
298 ByteBuf data = in.readSlice(lfIndex - in.readerIndex() - 1);
299 readEndOfLine(in);
300 return data;
301 }
302
303 private long parseRedisNumber(ByteBuf byteBuf) {
304 final int readableBytes = byteBuf.readableBytes();
305 final boolean negative = readableBytes > 0 && byteBuf.getByte(byteBuf.readerIndex()) == '-';
306 final int extraOneByteForNegative = negative ? 1 : 0;
307 if (readableBytes <= extraOneByteForNegative) {
308 throw new RedisCodecException("no number to parse: " + byteBuf.toString(CharsetUtil.US_ASCII));
309 }
310 if (readableBytes > RedisConstants.POSITIVE_LONG_MAX_LENGTH + extraOneByteForNegative) {
311 throw new RedisCodecException("too many characters to be a valid RESP Integer: " +
312 byteBuf.toString(CharsetUtil.US_ASCII));
313 }
314 if (negative) {
315 return -parsePositiveNumber(byteBuf.skipBytes(extraOneByteForNegative));
316 }
317 return parsePositiveNumber(byteBuf);
318 }
319
320 private long parsePositiveNumber(ByteBuf byteBuf) {
321 toPositiveLongProcessor.reset();
322 byteBuf.forEachByte(toPositiveLongProcessor);
323 return toPositiveLongProcessor.content();
324 }
325
326 private static final class ToPositiveLongProcessor implements ByteProcessor {
327 private long result;
328
329 @Override
330 public boolean process(byte value) throws Exception {
331 if (value < '0' || value > '9') {
332 throw new RedisCodecException("bad byte in number: " + value);
333 }
334 result = result * 10 + (value - '0');
335 return true;
336 }
337
338 public long content() {
339 return result;
340 }
341
342 public void reset() {
343 result = 0;
344 }
345 }
346 }