View Javadoc
1   /*
2    * Copyright 2016 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License, version 2.0 (the
5    * "License"); you may not use this file except in compliance with the License. You may obtain a
6    * copy of the License at:
7    *
8    * https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software distributed under the License
11   * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12   * or implied. See the License for the specific language governing permissions and limitations under
13   * the License.
14   */
15  
16  package io.netty.handler.codec.redis;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.channel.ChannelHandlerContext;
20  import io.netty.handler.codec.ByteToMessageDecoder;
21  import io.netty.util.ByteProcessor;
22  import io.netty.util.CharsetUtil;
23  import io.netty.util.internal.UnstableApi;
24  
25  import java.util.List;
26  
27  /**
28   * Decodes the Redis protocol into {@link RedisMessage} objects following
29   * <a href="https://redis.io/topics/protocol">RESP (REdis Serialization Protocol)</a>.
30   *
31   * {@link RedisMessage} parts can be aggregated to {@link RedisMessage} using
32   * {@link RedisArrayAggregator} or processed directly.
33   */
34  @UnstableApi
35  public final class RedisDecoder extends ByteToMessageDecoder {
36  
37      private final ToPositiveLongProcessor toPositiveLongProcessor = new ToPositiveLongProcessor();
38  
39      private final boolean decodeInlineCommands;
40      private final int maxInlineMessageLength;
41      private final RedisMessagePool messagePool;
42  
43      // current decoding states
44      private State state = State.DECODE_TYPE;
45      private RedisMessageType type;
46      private int remainingBulkLength;
47  
48      private enum State {
49          DECODE_TYPE,
50          DECODE_INLINE, // SIMPLE_STRING, ERROR, INTEGER
51          DECODE_LENGTH, // BULK_STRING, ARRAY_HEADER
52          DECODE_BULK_STRING_EOL,
53          DECODE_BULK_STRING_CONTENT,
54      }
55  
56      /**
57       * Creates a new instance with default {@code maxInlineMessageLength} and {@code messagePool}
58       * and inline command decoding disabled.
59       */
60      public RedisDecoder() {
61          this(false);
62      }
63  
64      /**
65       * Creates a new instance with default {@code maxInlineMessageLength} and {@code messagePool}.
66       * @param decodeInlineCommands if {@code true}, inline commands will be decoded.
67       */
68      public RedisDecoder(boolean decodeInlineCommands) {
69          this(RedisConstants.REDIS_INLINE_MESSAGE_MAX_LENGTH, FixedRedisMessagePool.INSTANCE, decodeInlineCommands);
70      }
71  
72      /**
73       * Creates a new instance with inline command decoding disabled.
74       * @param maxInlineMessageLength the maximum length of inline message.
75       * @param messagePool the predefined message pool.
76       */
77      public RedisDecoder(int maxInlineMessageLength, RedisMessagePool messagePool) {
78          this(maxInlineMessageLength, messagePool, false);
79      }
80  
81      /**
82       * Creates a new instance.
83       * @param maxInlineMessageLength the maximum length of inline message.
84       * @param messagePool the predefined message pool.
85       * @param decodeInlineCommands if {@code true}, inline commands will be decoded.
86       */
87      public RedisDecoder(int maxInlineMessageLength, RedisMessagePool messagePool, boolean decodeInlineCommands) {
88          if (maxInlineMessageLength <= 0 || maxInlineMessageLength > RedisConstants.REDIS_MESSAGE_MAX_LENGTH) {
89              throw new RedisCodecException("maxInlineMessageLength: " + maxInlineMessageLength +
90                                            " (expected: <= " + RedisConstants.REDIS_MESSAGE_MAX_LENGTH + ")");
91          }
92          this.maxInlineMessageLength = maxInlineMessageLength;
93          this.messagePool = messagePool;
94          this.decodeInlineCommands = decodeInlineCommands;
95      }
96  
97      @Override
98      protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
99          try {
100             for (;;) {
101                 switch (state) {
102                 case DECODE_TYPE:
103                     if (!decodeType(in)) {
104                         return;
105                     }
106                     break;
107                 case DECODE_INLINE:
108                     if (!decodeInline(in, out)) {
109                         return;
110                     }
111                     break;
112                 case DECODE_LENGTH:
113                     if (!decodeLength(in, out)) {
114                         return;
115                     }
116                     break;
117                 case DECODE_BULK_STRING_EOL:
118                     if (!decodeBulkStringEndOfLine(in, out)) {
119                         return;
120                     }
121                     break;
122                 case DECODE_BULK_STRING_CONTENT:
123                     if (!decodeBulkStringContent(in, out)) {
124                         return;
125                     }
126                     break;
127                 default:
128                     throw new RedisCodecException("Unknown state: " + state);
129                 }
130             }
131         } catch (RedisCodecException e) {
132             // Let's discard everything
133             in.skipBytes(in.readableBytes());
134             resetDecoder();
135             throw e;
136         } catch (Exception e) {
137             // Let's discard everything
138             in.skipBytes(in.readableBytes());
139             resetDecoder();
140             throw new RedisCodecException(e);
141         }
142     }
143 
144     private void resetDecoder() {
145         state = State.DECODE_TYPE;
146         remainingBulkLength = 0;
147     }
148 
149     private boolean decodeType(ByteBuf in) throws Exception {
150         if (!in.isReadable()) {
151             return false;
152         }
153 
154         type = RedisMessageType.readFrom(in, decodeInlineCommands);
155         state = type.isInline() ? State.DECODE_INLINE : State.DECODE_LENGTH;
156         return true;
157     }
158 
159     private boolean decodeInline(ByteBuf in, List<Object> out) throws Exception {
160         ByteBuf lineBytes = readLine(in);
161         if (lineBytes == null) {
162             if (in.readableBytes() > maxInlineMessageLength) {
163                 throw new RedisCodecException("length: " + in.readableBytes() +
164                                               " (expected: <= " + maxInlineMessageLength + ")");
165             }
166             return false;
167         }
168         out.add(newInlineRedisMessage(type, lineBytes));
169         resetDecoder();
170         return true;
171     }
172 
173     private boolean decodeLength(ByteBuf in, List<Object> out) throws Exception {
174         ByteBuf lineByteBuf = readLine(in);
175         if (lineByteBuf == null) {
176             int readableBytes = in.readableBytes();
177             if (readableBytes <= RedisConstants.POSITIVE_LONG_MAX_LENGTH) {
178                 // fast-path
179                 return false;
180             }
181             boolean isNegative = in.getByte(in.readerIndex()) == '-';
182             int capacity = RedisConstants.POSITIVE_LONG_MAX_LENGTH + (isNegative ? 1 : 0) + 1;
183             if (readableBytes > capacity) {
184                 throw new RedisCodecException("too many characters to be a valid RESP Integer: " + readableBytes);
185             }
186             return false;
187         }
188         final long length = parseRedisNumber(lineByteBuf);
189         if (length < RedisConstants.NULL_VALUE) {
190             throw new RedisCodecException("length: " + length + " (expected: >= " + RedisConstants.NULL_VALUE + ")");
191         }
192         switch (type) {
193         case ARRAY_HEADER:
194             out.add(new ArrayHeaderRedisMessage(length));
195             resetDecoder();
196             return true;
197         case BULK_STRING:
198             if (length > RedisConstants.REDIS_MESSAGE_MAX_LENGTH) {
199                 throw new RedisCodecException("length: " + length + " (expected: <= " +
200                                               RedisConstants.REDIS_MESSAGE_MAX_LENGTH + ")");
201             }
202             remainingBulkLength = (int) length; // range(int) is already checked.
203             return decodeBulkString(in, out);
204         default:
205             throw new RedisCodecException("bad type: " + type);
206         }
207     }
208 
209     private boolean decodeBulkString(ByteBuf in, List<Object> out) throws Exception {
210         switch (remainingBulkLength) {
211         case RedisConstants.NULL_VALUE: // $-1\r\n
212             out.add(FullBulkStringRedisMessage.NULL_INSTANCE);
213             resetDecoder();
214             return true;
215         case 0:
216             state = State.DECODE_BULK_STRING_EOL;
217             return decodeBulkStringEndOfLine(in, out);
218         default: // expectedBulkLength is always positive.
219             out.add(new BulkStringHeaderRedisMessage(remainingBulkLength));
220             state = State.DECODE_BULK_STRING_CONTENT;
221             return decodeBulkStringContent(in, out);
222         }
223     }
224 
225     // $0\r\n <here> \r\n
226     private boolean decodeBulkStringEndOfLine(ByteBuf in, List<Object> out) throws Exception {
227         if (in.readableBytes() < RedisConstants.EOL_LENGTH) {
228             return false;
229         }
230         readEndOfLine(in);
231         out.add(FullBulkStringRedisMessage.EMPTY_INSTANCE);
232         resetDecoder();
233         return true;
234     }
235 
236     // ${expectedBulkLength}\r\n <here> {data...}\r\n
237     private boolean decodeBulkStringContent(ByteBuf in, List<Object> out) throws Exception {
238         final int readableBytes = in.readableBytes();
239         if (readableBytes == 0 || remainingBulkLength == 0 && readableBytes < RedisConstants.EOL_LENGTH) {
240             return false;
241         }
242 
243         // if this is last frame.
244         if (readableBytes >= remainingBulkLength + RedisConstants.EOL_LENGTH) {
245             ByteBuf content = in.readSlice(remainingBulkLength);
246             readEndOfLine(in);
247             // Only call retain after readEndOfLine(...) as the method may throw an exception.
248             out.add(new DefaultLastBulkStringRedisContent(content.retain()));
249             resetDecoder();
250             return true;
251         }
252 
253         // chunked write.
254         int toRead = Math.min(remainingBulkLength, readableBytes);
255         remainingBulkLength -= toRead;
256         out.add(new DefaultBulkStringRedisContent(in.readSlice(toRead).retain()));
257         return true;
258     }
259 
260     private static void readEndOfLine(final ByteBuf in) {
261         final short delim = in.readShort();
262         if (RedisConstants.EOL_SHORT == delim) {
263             return;
264         }
265         final byte[] bytes = RedisCodecUtil.shortToBytes(delim);
266         throw new RedisCodecException("delimiter: [" + bytes[0] + "," + bytes[1] + "] (expected: \\r\\n)");
267     }
268 
269     private RedisMessage newInlineRedisMessage(RedisMessageType messageType, ByteBuf content) {
270         switch (messageType) {
271         case INLINE_COMMAND:
272             return new InlineCommandRedisMessage(content.toString(CharsetUtil.UTF_8));
273         case SIMPLE_STRING: {
274             SimpleStringRedisMessage cached = messagePool.getSimpleString(content);
275             return cached != null ? cached : new SimpleStringRedisMessage(content.toString(CharsetUtil.UTF_8));
276         }
277         case ERROR: {
278             ErrorRedisMessage cached = messagePool.getError(content);
279             return cached != null ? cached : new ErrorRedisMessage(content.toString(CharsetUtil.UTF_8));
280         }
281         case INTEGER: {
282             IntegerRedisMessage cached = messagePool.getInteger(content);
283             return cached != null ? cached : new IntegerRedisMessage(parseRedisNumber(content));
284         }
285         default:
286             throw new RedisCodecException("bad type: " + messageType);
287         }
288     }
289 
290     private static ByteBuf readLine(ByteBuf in) {
291         if (!in.isReadable(RedisConstants.EOL_LENGTH)) {
292             return null;
293         }
294         final int lfIndex = in.indexOf(in.readerIndex(), in.writerIndex(), (byte) '\n');
295         if (lfIndex < 0) {
296             return null;
297         }
298         ByteBuf data = in.readSlice(lfIndex - in.readerIndex() - 1); // `-1` is for CR
299         readEndOfLine(in); // validate CR LF
300         return data;
301     }
302 
303     private long parseRedisNumber(ByteBuf byteBuf) {
304         final int readableBytes = byteBuf.readableBytes();
305         final boolean negative = readableBytes > 0 && byteBuf.getByte(byteBuf.readerIndex()) == '-';
306         final int extraOneByteForNegative = negative ? 1 : 0;
307         if (readableBytes <= extraOneByteForNegative) {
308             throw new RedisCodecException("no number to parse: " + byteBuf.toString(CharsetUtil.US_ASCII));
309         }
310         if (readableBytes > RedisConstants.POSITIVE_LONG_MAX_LENGTH + extraOneByteForNegative) {
311             throw new RedisCodecException("too many characters to be a valid RESP Integer: " +
312                                           byteBuf.toString(CharsetUtil.US_ASCII));
313         }
314         if (negative) {
315             return -parsePositiveNumber(byteBuf.skipBytes(extraOneByteForNegative));
316         }
317         return parsePositiveNumber(byteBuf);
318     }
319 
320     private long parsePositiveNumber(ByteBuf byteBuf) {
321         toPositiveLongProcessor.reset();
322         byteBuf.forEachByte(toPositiveLongProcessor);
323         return toPositiveLongProcessor.content();
324     }
325 
326     private static final class ToPositiveLongProcessor implements ByteProcessor {
327         private long result;
328 
329         @Override
330         public boolean process(byte value) throws Exception {
331             if (value < '0' || value > '9') {
332                 throw new RedisCodecException("bad byte in number: " + value);
333             }
334             result = result * 10 + (value - '0');
335             return true;
336         }
337 
338         public long content() {
339             return result;
340         }
341 
342         public void reset() {
343             result = 0;
344         }
345     }
346 }