View Javadoc
1   /*
2    * Copyright 2013 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.codec.memcache.binary;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.Unpooled;
20  import io.netty.channel.ChannelHandlerContext;
21  import io.netty.handler.codec.DecoderResult;
22  import io.netty.handler.codec.memcache.AbstractMemcacheObjectDecoder;
23  import io.netty.handler.codec.memcache.DefaultLastMemcacheContent;
24  import io.netty.handler.codec.memcache.DefaultMemcacheContent;
25  import io.netty.handler.codec.memcache.LastMemcacheContent;
26  import io.netty.handler.codec.memcache.MemcacheContent;
27  import io.netty.util.CharsetUtil;
28  
29  import java.util.List;
30  
31  import static io.netty.buffer.ByteBufUtil.*;
32  
33  /**
34   * Decoder for both {@link BinaryMemcacheRequest} and {@link BinaryMemcacheResponse}.
35   * <p/>
36   * The difference in the protocols (header) is implemented by the subclasses.
37   */
38  public abstract class AbstractBinaryMemcacheDecoder<M extends BinaryMemcacheMessage>
39      extends AbstractMemcacheObjectDecoder {
40  
41      public static final int DEFAULT_MAX_CHUNK_SIZE = 8192;
42  
43      private final int chunkSize;
44  
45      private M currentMessage;
46      private int alreadyReadChunkSize;
47  
48      private State state = State.READ_HEADER;
49  
50      /**
51       * Create a new {@link AbstractBinaryMemcacheDecoder} with default settings.
52       */
53      protected AbstractBinaryMemcacheDecoder() {
54          this(DEFAULT_MAX_CHUNK_SIZE);
55      }
56  
57      /**
58       * Create a new {@link AbstractBinaryMemcacheDecoder} with custom settings.
59       *
60       * @param chunkSize the maximum chunk size of the payload.
61       */
62      protected AbstractBinaryMemcacheDecoder(int chunkSize) {
63          if (chunkSize < 0) {
64              throw new IllegalArgumentException("chunkSize must be a positive integer: " + chunkSize);
65          }
66  
67          this.chunkSize = chunkSize;
68      }
69  
70      @Override
71      protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
72          switch (state) {
73              case READ_HEADER: try {
74                  if (in.readableBytes() < 24) {
75                      return;
76                  }
77                  resetDecoder();
78  
79                  currentMessage = decodeHeader(in);
80                  state = State.READ_EXTRAS;
81              } catch (Exception e) {
82                  out.add(invalidMessage(e));
83                  return;
84              }
85              case READ_EXTRAS: try {
86                  byte extrasLength = currentMessage.extrasLength();
87                  if (extrasLength > 0) {
88                      if (in.readableBytes() < extrasLength) {
89                          return;
90                      }
91  
92                      currentMessage.setExtras(readBytes(ctx.alloc(), in, extrasLength));
93                  }
94  
95                  state = State.READ_KEY;
96              } catch (Exception e) {
97                  out.add(invalidMessage(e));
98                  return;
99              }
100             case READ_KEY: try {
101                 short keyLength = currentMessage.keyLength();
102                 if (keyLength > 0) {
103                     if (in.readableBytes() < keyLength) {
104                         return;
105                     }
106 
107                     currentMessage.setKey(in.toString(in.readerIndex(), keyLength, CharsetUtil.UTF_8));
108                     in.skipBytes(keyLength);
109                 }
110 
111                 out.add(currentMessage);
112                 state = State.READ_CONTENT;
113             } catch (Exception e) {
114                 out.add(invalidMessage(e));
115                 return;
116             }
117             case READ_CONTENT: try {
118                 int valueLength = currentMessage.totalBodyLength()
119                     - currentMessage.keyLength()
120                     - currentMessage.extrasLength();
121                 int toRead = in.readableBytes();
122                 if (valueLength > 0) {
123                     if (toRead == 0) {
124                         return;
125                     }
126 
127                     if (toRead > chunkSize) {
128                         toRead = chunkSize;
129                     }
130 
131                     int remainingLength = valueLength - alreadyReadChunkSize;
132                     if (toRead > remainingLength) {
133                         toRead = remainingLength;
134                     }
135 
136                     ByteBuf chunkBuffer = readBytes(ctx.alloc(), in, toRead);
137 
138                     MemcacheContent chunk;
139                     if ((alreadyReadChunkSize += toRead) >= valueLength) {
140                         chunk = new DefaultLastMemcacheContent(chunkBuffer);
141                     } else {
142                         chunk = new DefaultMemcacheContent(chunkBuffer);
143                     }
144 
145                     out.add(chunk);
146                     if (alreadyReadChunkSize < valueLength) {
147                         return;
148                     }
149                 } else {
150                     out.add(LastMemcacheContent.EMPTY_LAST_CONTENT);
151                 }
152 
153                 state = State.READ_HEADER;
154                 return;
155             } catch (Exception e) {
156                 out.add(invalidChunk(e));
157                 return;
158             }
159             case BAD_MESSAGE:
160                 in.skipBytes(actualReadableBytes());
161                 return;
162             default:
163                 throw new Error("Unknown state reached: " + state);
164         }
165     }
166 
167     /**
168      * Helper method to create a message indicating a invalid decoding result.
169      *
170      * @param cause the cause of the decoding failure.
171      * @return a valid message indicating failure.
172      */
173     private M invalidMessage(Exception cause) {
174         state = State.BAD_MESSAGE;
175         M message = buildInvalidMessage();
176         message.setDecoderResult(DecoderResult.failure(cause));
177         return message;
178     }
179 
180     /**
181      * Helper method to create a content chunk indicating a invalid decoding result.
182      *
183      * @param cause the cause of the decoding failure.
184      * @return a valid content chunk indicating failure.
185      */
186     private MemcacheContent invalidChunk(Exception cause) {
187         state = State.BAD_MESSAGE;
188         MemcacheContent chunk = new DefaultLastMemcacheContent(Unpooled.EMPTY_BUFFER);
189         chunk.setDecoderResult(DecoderResult.failure(cause));
190         return chunk;
191     }
192 
193     /**
194      * When the channel goes inactive, release all frames to prevent data leaks.
195      *
196      * @param ctx handler context
197      * @throws Exception
198      */
199     @Override
200     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
201         super.channelInactive(ctx);
202 
203         if (currentMessage != null) {
204             currentMessage.release();
205         }
206 
207         resetDecoder();
208     }
209 
210     /**
211      * Prepare for next decoding iteration.
212      */
213     protected void resetDecoder() {
214         currentMessage = null;
215         alreadyReadChunkSize = 0;
216     }
217 
218     /**
219      * Decode and return the parsed {@link BinaryMemcacheMessage}.
220      *
221      * @param in the incoming buffer.
222      * @return the decoded header.
223      */
224     protected abstract M decodeHeader(ByteBuf in);
225 
226     /**
227      * Helper method to create a upstream message when the incoming parsing did fail.
228      *
229      * @return a message indicating a decoding failure.
230      */
231     protected abstract M buildInvalidMessage();
232 
233     /**
234      * Contains all states this decoder can possibly be in.
235      * <p/>
236      * Note that most of the states can be optional, the only one required is reading
237      * the header ({@link #READ_HEADER}. All other steps depend on the length fields
238      * in the header and will be executed conditionally.
239      */
240     enum State {
241         /**
242          * Currently reading the header portion.
243          */
244         READ_HEADER,
245 
246         /**
247          * Currently reading the extras portion (optional).
248          */
249         READ_EXTRAS,
250 
251         /**
252          * Currently reading the key portion (optional).
253          */
254         READ_KEY,
255 
256         /**
257          * Currently reading the value chunks (optional).
258          */
259         READ_CONTENT,
260 
261         /**
262          * Something went wrong while decoding the message or chunks.
263          */
264         BAD_MESSAGE
265     }
266 
267 }