View Javadoc
1   /*
2    * Copyright 2016 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License, version 2.0 (the
5    * "License"); you may not use this file except in compliance with the License. You may obtain a
6    * copy of the License at:
7    *
8    * http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software distributed under the License
11   * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12   * or implied. See the License for the specific language governing permissions and limitations under
13   * the License.
14   */
15  
16  package io.netty.handler.codec.redis;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.channel.ChannelHandlerContext;
20  import io.netty.handler.codec.ByteToMessageDecoder;
21  import io.netty.util.ByteProcessor;
22  import io.netty.util.CharsetUtil;
23  import io.netty.util.internal.UnstableApi;
24  
25  import java.util.List;
26  
27  /**
28   * Decodes the Redis protocol into {@link RedisMessage} objects following
29   * <a href="http://redis.io/topics/protocol">RESP (REdis Serialization Protocol)</a>.
30   *
31   * {@link RedisMessage} parts can be aggregated to {@link RedisMessage} using
32   * {@link RedisArrayAggregator} or processed directly.
33   */
34  @UnstableApi
35  public final class RedisDecoder extends ByteToMessageDecoder {
36  
37      private final ToPositiveLongProcessor toPositiveLongProcessor = new ToPositiveLongProcessor();
38  
39      private final int maxInlineMessageLength;
40      private final RedisMessagePool messagePool;
41  
42      // current decoding states
43      private State state = State.DECODE_TYPE;
44      private RedisMessageType type;
45      private int remainingBulkLength;
46  
47      private enum State {
48          DECODE_TYPE,
49          DECODE_INLINE, // SIMPLE_STRING, ERROR, INTEGER
50          DECODE_LENGTH, // BULK_STRING, ARRAY_HEADER
51          DECODE_BULK_STRING_EOL,
52          DECODE_BULK_STRING_CONTENT,
53      }
54  
55      /**
56       * Creates a new instance with default {@code maxInlineMessageLength} and {@code messagePool}.
57       */
58      public RedisDecoder() {
59          // 1024 * 64 is max inline length of current Redis server implementation.
60          this(1024 * 64, FixedRedisMessagePool.INSTANCE);
61      }
62  
63      /**
64       * Creates a new instance.
65       * @param maxInlineMessageLength the maximum length of inline message.
66       * @param messagePool the predefined message pool.
67       */
68      public RedisDecoder(int maxInlineMessageLength, RedisMessagePool messagePool) {
69          if (maxInlineMessageLength <= 0 || maxInlineMessageLength > RedisConstants.REDIS_MESSAGE_MAX_LENGTH) {
70              throw new RedisCodecException("maxInlineMessageLength: " + maxInlineMessageLength +
71                                            " (expected: <= " + RedisConstants.REDIS_MESSAGE_MAX_LENGTH + ")");
72          }
73          this.maxInlineMessageLength = maxInlineMessageLength;
74          this.messagePool = messagePool;
75      }
76  
77      @Override
78      protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
79          try {
80              for (;;) {
81                  switch (state) {
82                  case DECODE_TYPE:
83                      if (!decodeType(in)) {
84                          return;
85                      }
86                      break;
87                  case DECODE_INLINE:
88                      if (!decodeInline(in, out)) {
89                          return;
90                      }
91                      break;
92                  case DECODE_LENGTH:
93                      if (!decodeLength(in, out)) {
94                          return;
95                      }
96                      break;
97                  case DECODE_BULK_STRING_EOL:
98                      if (!decodeBulkStringEndOfLine(in, out)) {
99                          return;
100                     }
101                     break;
102                 case DECODE_BULK_STRING_CONTENT:
103                     if (!decodeBulkStringContent(in, out)) {
104                         return;
105                     }
106                     break;
107                 default:
108                     throw new RedisCodecException("Unknown state: " + state);
109                 }
110             }
111         } catch (RedisCodecException e) {
112             resetDecoder();
113             throw e;
114         } catch (Exception e) {
115             resetDecoder();
116             throw new RedisCodecException(e);
117         }
118     }
119 
120     private void resetDecoder() {
121         state = State.DECODE_TYPE;
122         remainingBulkLength = 0;
123     }
124 
125     private boolean decodeType(ByteBuf in) throws Exception {
126         if (!in.isReadable()) {
127             return false;
128         }
129         type = RedisMessageType.valueOf(in.readByte());
130         state = type.isInline() ? State.DECODE_INLINE : State.DECODE_LENGTH;
131         return true;
132     }
133 
134     private boolean decodeInline(ByteBuf in, List<Object> out) throws Exception {
135         ByteBuf lineBytes = readLine(in);
136         if (lineBytes == null) {
137             if (in.readableBytes() > maxInlineMessageLength) {
138                 throw new RedisCodecException("length: " + in.readableBytes() +
139                                               " (expected: <= " + maxInlineMessageLength + ")");
140             }
141             return false;
142         }
143         out.add(newInlineRedisMessage(type, lineBytes));
144         resetDecoder();
145         return true;
146     }
147 
148     private boolean decodeLength(ByteBuf in, List<Object> out) throws Exception {
149         ByteBuf lineByteBuf = readLine(in);
150         if (lineByteBuf == null) {
151             return false;
152         }
153         final long length = parseRedisNumber(lineByteBuf);
154         if (length < RedisConstants.NULL_VALUE) {
155             throw new RedisCodecException("length: " + length + " (expected: >= " + RedisConstants.NULL_VALUE + ")");
156         }
157         switch (type) {
158         case ARRAY_HEADER:
159             out.add(new ArrayHeaderRedisMessage(length));
160             resetDecoder();
161             return true;
162         case BULK_STRING:
163             if (length > RedisConstants.REDIS_MESSAGE_MAX_LENGTH) {
164                 throw new RedisCodecException("length: " + length + " (expected: <= " +
165                                               RedisConstants.REDIS_MESSAGE_MAX_LENGTH + ")");
166             }
167             remainingBulkLength = (int) length; // range(int) is already checked.
168             return decodeBulkString(in, out);
169         default:
170             throw new RedisCodecException("bad type: " + type);
171         }
172     }
173 
174     private boolean decodeBulkString(ByteBuf in, List<Object> out) throws Exception {
175         switch (remainingBulkLength) {
176         case RedisConstants.NULL_VALUE: // $-1\r\n
177             out.add(FullBulkStringRedisMessage.NULL_INSTANCE);
178             resetDecoder();
179             return true;
180         case 0:
181             state = State.DECODE_BULK_STRING_EOL;
182             return decodeBulkStringEndOfLine(in, out);
183         default: // expectedBulkLength is always positive.
184             out.add(new BulkStringHeaderRedisMessage(remainingBulkLength));
185             state = State.DECODE_BULK_STRING_CONTENT;
186             return decodeBulkStringContent(in, out);
187         }
188     }
189 
190     // $0\r\n <here> \r\n
191     private boolean decodeBulkStringEndOfLine(ByteBuf in, List<Object> out) throws Exception {
192         if (in.readableBytes() < RedisConstants.EOL_LENGTH) {
193             return false;
194         }
195         readEndOfLine(in);
196         out.add(FullBulkStringRedisMessage.EMPTY_INSTANCE);
197         resetDecoder();
198         return true;
199     }
200 
201     // ${expectedBulkLength}\r\n <here> {data...}\r\n
202     private boolean decodeBulkStringContent(ByteBuf in, List<Object> out) throws Exception {
203         final int readableBytes = in.readableBytes();
204         if (readableBytes == 0 || remainingBulkLength == 0 && readableBytes < RedisConstants.EOL_LENGTH) {
205             return false;
206         }
207 
208         // if this is last frame.
209         if (readableBytes >= remainingBulkLength + RedisConstants.EOL_LENGTH) {
210             ByteBuf content = in.readSlice(remainingBulkLength);
211             readEndOfLine(in);
212             // Only call retain after readEndOfLine(...) as the method may throw an exception.
213             out.add(new DefaultLastBulkStringRedisContent(content.retain()));
214             resetDecoder();
215             return true;
216         }
217 
218         // chunked write.
219         int toRead = Math.min(remainingBulkLength, readableBytes);
220         remainingBulkLength -= toRead;
221         out.add(new DefaultBulkStringRedisContent(in.readSlice(toRead).retain()));
222         return true;
223     }
224 
225     private static void readEndOfLine(final ByteBuf in) {
226         final short delim = in.readShort();
227         if (RedisConstants.EOL_SHORT == delim) {
228             return;
229         }
230         final byte[] bytes = RedisCodecUtil.shortToBytes(delim);
231         throw new RedisCodecException("delimiter: [" + bytes[0] + "," + bytes[1] + "] (expected: \\r\\n)");
232     }
233 
234     private RedisMessage newInlineRedisMessage(RedisMessageType messageType, ByteBuf content) {
235         switch (messageType) {
236         case SIMPLE_STRING: {
237             SimpleStringRedisMessage cached = messagePool.getSimpleString(content);
238             return cached != null ? cached : new SimpleStringRedisMessage(content.toString(CharsetUtil.UTF_8));
239         }
240         case ERROR: {
241             ErrorRedisMessage cached = messagePool.getError(content);
242             return cached != null ? cached : new ErrorRedisMessage(content.toString(CharsetUtil.UTF_8));
243         }
244         case INTEGER: {
245             IntegerRedisMessage cached = messagePool.getInteger(content);
246             return cached != null ? cached : new IntegerRedisMessage(parseRedisNumber(content));
247         }
248         default:
249             throw new RedisCodecException("bad type: " + messageType);
250         }
251     }
252 
253     private static ByteBuf readLine(ByteBuf in) {
254         if (!in.isReadable(RedisConstants.EOL_LENGTH)) {
255             return null;
256         }
257         final int lfIndex = in.forEachByte(ByteProcessor.FIND_LF);
258         if (lfIndex < 0) {
259             return null;
260         }
261         ByteBuf data = in.readSlice(lfIndex - in.readerIndex() - 1); // `-1` is for CR
262         readEndOfLine(in); // validate CR LF
263         return data;
264     }
265 
266     private long parseRedisNumber(ByteBuf byteBuf) {
267         final int readableBytes = byteBuf.readableBytes();
268         final boolean negative = readableBytes > 0 && byteBuf.getByte(byteBuf.readerIndex()) == '-';
269         final int extraOneByteForNegative = negative ? 1 : 0;
270         if (readableBytes <= extraOneByteForNegative) {
271             throw new RedisCodecException("no number to parse: " + byteBuf.toString(CharsetUtil.US_ASCII));
272         }
273         if (readableBytes > RedisConstants.POSITIVE_LONG_MAX_LENGTH + extraOneByteForNegative) {
274             throw new RedisCodecException("too many characters to be a valid RESP Integer: " +
275                                           byteBuf.toString(CharsetUtil.US_ASCII));
276         }
277         if (negative) {
278             return -parsePositiveNumber(byteBuf.skipBytes(extraOneByteForNegative));
279         }
280         return parsePositiveNumber(byteBuf);
281     }
282 
283     private long parsePositiveNumber(ByteBuf byteBuf) {
284         toPositiveLongProcessor.reset();
285         byteBuf.forEachByte(toPositiveLongProcessor);
286         return toPositiveLongProcessor.content();
287     }
288 
289     private static final class ToPositiveLongProcessor implements ByteProcessor {
290         private long result;
291 
292         @Override
293         public boolean process(byte value) throws Exception {
294             if (value < '0' || value > '9') {
295                 throw new RedisCodecException("bad byte in number: " + value);
296             }
297             result = result * 10 + (value - '0');
298             return true;
299         }
300 
301         public long content() {
302             return result;
303         }
304 
305         public void reset() {
306             result = 0;
307         }
308     }
309 }