View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.codec.compression;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufUtil;
20  import io.netty.channel.ChannelHandlerContext;
21  import io.netty.handler.codec.ByteToMessageDecoder;
22  
23  import java.util.Arrays;
24  import java.util.List;
25  
26  import static io.netty.handler.codec.compression.Snappy.*;
27  
28  /**
29   * Uncompresses a {@link ByteBuf} encoded with the Snappy framing format.
30   *
31   * See http://code.google.com/p/snappy/source/browse/trunk/framing_format.txt
32   *
33   * Note that by default, validation of the checksum header in each chunk is
34   * DISABLED for performance improvements. If performance is less of an issue,
35   * or if you would prefer the safety that checksum validation brings, please
36   * use the {@link #SnappyFrameDecoder(boolean)} constructor with the argument
37   * set to {@code true}.
38   */
39  public class SnappyFrameDecoder extends ByteToMessageDecoder {
40  
41      private enum ChunkType {
42          STREAM_IDENTIFIER,
43          COMPRESSED_DATA,
44          UNCOMPRESSED_DATA,
45          RESERVED_UNSKIPPABLE,
46          RESERVED_SKIPPABLE
47      }
48  
49      private static final byte[] SNAPPY = { 's', 'N', 'a', 'P', 'p', 'Y' };
50      private static final int MAX_UNCOMPRESSED_DATA_SIZE = 65536 + 4;
51  
52      private final Snappy snappy = new Snappy();
53      private final boolean validateChecksums;
54  
55      private boolean started;
56      private boolean corrupted;
57  
58      /**
59       * Creates a new snappy-framed decoder with validation of checksums
60       * turned OFF. To turn checksum validation on, please use the alternate
61       * {@link #SnappyFrameDecoder(boolean)} constructor.
62       */
63      public SnappyFrameDecoder() {
64          this(false);
65      }
66  
67      /**
68       * Creates a new snappy-framed decoder with validation of checksums
69       * as specified.
70       *
71       * @param validateChecksums
72       *        If true, the checksum field will be validated against the actual
73       *        uncompressed data, and if the checksums do not match, a suitable
74       *        {@link DecompressionException} will be thrown
75       */
76      public SnappyFrameDecoder(boolean validateChecksums) {
77          this.validateChecksums = validateChecksums;
78      }
79  
80      @Override
81      protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
82          if (corrupted) {
83              in.skipBytes(in.readableBytes());
84              return;
85          }
86  
87          try {
88              int idx = in.readerIndex();
89              final int inSize = in.readableBytes();
90              if (inSize < 4) {
91                  // We need to be at least able to read the chunk type identifier (one byte),
92                  // and the length of the chunk (3 bytes) in order to proceed
93                  return;
94              }
95  
96              final int chunkTypeVal = in.getUnsignedByte(idx);
97              final ChunkType chunkType = mapChunkType((byte) chunkTypeVal);
98              final int chunkLength = ByteBufUtil.swapMedium(in.getUnsignedMedium(idx + 1));
99  
100             switch (chunkType) {
101                 case STREAM_IDENTIFIER:
102                     if (chunkLength != SNAPPY.length) {
103                         throw new DecompressionException("Unexpected length of stream identifier: " + chunkLength);
104                     }
105 
106                     if (inSize < 4 + SNAPPY.length) {
107                         break;
108                     }
109 
110                     byte[] identifier = new byte[chunkLength];
111                     in.skipBytes(4).readBytes(identifier);
112 
113                     if (!Arrays.equals(identifier, SNAPPY)) {
114                         throw new DecompressionException("Unexpected stream identifier contents. Mismatched snappy " +
115                                 "protocol version?");
116                     }
117 
118                     started = true;
119                     break;
120                 case RESERVED_SKIPPABLE:
121                     if (!started) {
122                         throw new DecompressionException("Received RESERVED_SKIPPABLE tag before STREAM_IDENTIFIER");
123                     }
124 
125                     if (inSize < 4 + chunkLength) {
126                         // TODO: Don't keep skippable bytes
127                         return;
128                     }
129 
130                     in.skipBytes(4 + chunkLength);
131                     break;
132                 case RESERVED_UNSKIPPABLE:
133                     // The spec mandates that reserved unskippable chunks must immediately
134                     // return an error, as we must assume that we cannot decode the stream
135                     // correctly
136                     throw new DecompressionException(
137                             "Found reserved unskippable chunk type: 0x" + Integer.toHexString(chunkTypeVal));
138                 case UNCOMPRESSED_DATA:
139                     if (!started) {
140                         throw new DecompressionException("Received UNCOMPRESSED_DATA tag before STREAM_IDENTIFIER");
141                     }
142                     if (chunkLength > MAX_UNCOMPRESSED_DATA_SIZE) {
143                         throw new DecompressionException("Received UNCOMPRESSED_DATA larger than 65540 bytes");
144                     }
145 
146                     if (inSize < 4 + chunkLength) {
147                         return;
148                     }
149 
150                     in.skipBytes(4);
151                     if (validateChecksums) {
152                         int checksum = ByteBufUtil.swapInt(in.readInt());
153                         validateChecksum(checksum, in, in.readerIndex(), chunkLength - 4);
154                     } else {
155                         in.skipBytes(4);
156                     }
157                     out.add(in.readSlice(chunkLength - 4).retain());
158                     break;
159                 case COMPRESSED_DATA:
160                     if (!started) {
161                         throw new DecompressionException("Received COMPRESSED_DATA tag before STREAM_IDENTIFIER");
162                     }
163 
164                     if (inSize < 4 + chunkLength) {
165                         return;
166                     }
167 
168                     in.skipBytes(4);
169                     int checksum = ByteBufUtil.swapInt(in.readInt());
170                     ByteBuf uncompressed = ctx.alloc().buffer(0);
171                     if (validateChecksums) {
172                         int oldWriterIndex = in.writerIndex();
173                         try {
174                             in.writerIndex(in.readerIndex() + chunkLength - 4);
175                             snappy.decode(in, uncompressed);
176                         } finally {
177                             in.writerIndex(oldWriterIndex);
178                         }
179                         validateChecksum(checksum, uncompressed, 0, uncompressed.writerIndex());
180                     } else {
181                         snappy.decode(in.readSlice(chunkLength - 4), uncompressed);
182                     }
183                     out.add(uncompressed);
184                     snappy.reset();
185                     break;
186             }
187         } catch (Exception e) {
188             corrupted = true;
189             throw e;
190         }
191     }
192 
193     /**
194      * Decodes the chunk type from the type tag byte.
195      *
196      * @param type The tag byte extracted from the stream
197      * @return The appropriate {@link ChunkType}, defaulting to {@link ChunkType#RESERVED_UNSKIPPABLE}
198      */
199     private static ChunkType mapChunkType(byte type) {
200         if (type == 0) {
201             return ChunkType.COMPRESSED_DATA;
202         } else if (type == 1) {
203             return ChunkType.UNCOMPRESSED_DATA;
204         } else if (type == (byte) 0xff) {
205             return ChunkType.STREAM_IDENTIFIER;
206         } else if ((type & 0x80) == 0x80) {
207             return ChunkType.RESERVED_SKIPPABLE;
208         } else {
209             return ChunkType.RESERVED_UNSKIPPABLE;
210         }
211     }
212 }