View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.codec.compression;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.channel.ChannelHandlerContext;
20  import io.netty.handler.codec.ByteToMessageDecoder;
21  
22  import java.util.List;
23  
24  import static io.netty.handler.codec.compression.Snappy.validateChecksum;
25  
26  /**
27   * Uncompresses a {@link ByteBuf} encoded with the Snappy framing format.
28   *
29   * See <a href="https://github.com/google/snappy/blob/master/framing_format.txt">Snappy framing format</a>.
30   *
31   * Note that by default, validation of the checksum header in each chunk is
32   * DISABLED for performance improvements. If performance is less of an issue,
33   * or if you would prefer the safety that checksum validation brings, please
34   * use the {@link #SnappyFrameDecoder(boolean)} constructor with the argument
35   * set to {@code true}.
36   */
37  public class SnappyFrameDecoder extends ByteToMessageDecoder {
38  
39      private enum ChunkType {
40          STREAM_IDENTIFIER,
41          COMPRESSED_DATA,
42          UNCOMPRESSED_DATA,
43          RESERVED_UNSKIPPABLE,
44          RESERVED_SKIPPABLE
45      }
46  
47      private static final int SNAPPY_IDENTIFIER_LEN = 6;
48      private static final int MAX_UNCOMPRESSED_DATA_SIZE = 65536 + 4;
49  
50      private final Snappy snappy = new Snappy();
51      private final boolean validateChecksums;
52  
53      private boolean started;
54      private boolean corrupted;
55  
56      /**
57       * Creates a new snappy-framed decoder with validation of checksums
58       * turned OFF. To turn checksum validation on, please use the alternate
59       * {@link #SnappyFrameDecoder(boolean)} constructor.
60       */
61      public SnappyFrameDecoder() {
62          this(false);
63      }
64  
65      /**
66       * Creates a new snappy-framed decoder with validation of checksums
67       * as specified.
68       *
69       * @param validateChecksums
70       *        If true, the checksum field will be validated against the actual
71       *        uncompressed data, and if the checksums do not match, a suitable
72       *        {@link DecompressionException} will be thrown
73       */
74      public SnappyFrameDecoder(boolean validateChecksums) {
75          this.validateChecksums = validateChecksums;
76      }
77  
78      @Override
79      protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
80          if (corrupted) {
81              in.skipBytes(in.readableBytes());
82              return;
83          }
84  
85          try {
86              int idx = in.readerIndex();
87              final int inSize = in.readableBytes();
88              if (inSize < 4) {
89                  // We need to be at least able to read the chunk type identifier (one byte),
90                  // and the length of the chunk (3 bytes) in order to proceed
91                  return;
92              }
93  
94              final int chunkTypeVal = in.getUnsignedByte(idx);
95              final ChunkType chunkType = mapChunkType((byte) chunkTypeVal);
96              final int chunkLength = in.getUnsignedMediumLE(idx + 1);
97  
98              switch (chunkType) {
99                  case STREAM_IDENTIFIER:
100                     if (chunkLength != SNAPPY_IDENTIFIER_LEN) {
101                         throw new DecompressionException("Unexpected length of stream identifier: " + chunkLength);
102                     }
103 
104                     if (inSize < 4 + SNAPPY_IDENTIFIER_LEN) {
105                         break;
106                     }
107 
108                     in.skipBytes(4);
109                     int offset = in.readerIndex();
110                     in.skipBytes(SNAPPY_IDENTIFIER_LEN);
111 
112                     checkByte(in.getByte(offset++), (byte) 's');
113                     checkByte(in.getByte(offset++), (byte) 'N');
114                     checkByte(in.getByte(offset++), (byte) 'a');
115                     checkByte(in.getByte(offset++), (byte) 'P');
116                     checkByte(in.getByte(offset++), (byte) 'p');
117                     checkByte(in.getByte(offset), (byte) 'Y');
118 
119                     started = true;
120                     break;
121                 case RESERVED_SKIPPABLE:
122                     if (!started) {
123                         throw new DecompressionException("Received RESERVED_SKIPPABLE tag before STREAM_IDENTIFIER");
124                     }
125 
126                     if (inSize < 4 + chunkLength) {
127                         // TODO: Don't keep skippable bytes
128                         return;
129                     }
130 
131                     in.skipBytes(4 + chunkLength);
132                     break;
133                 case RESERVED_UNSKIPPABLE:
134                     // The spec mandates that reserved unskippable chunks must immediately
135                     // return an error, as we must assume that we cannot decode the stream
136                     // correctly
137                     throw new DecompressionException(
138                             "Found reserved unskippable chunk type: 0x" + Integer.toHexString(chunkTypeVal));
139                 case UNCOMPRESSED_DATA:
140                     if (!started) {
141                         throw new DecompressionException("Received UNCOMPRESSED_DATA tag before STREAM_IDENTIFIER");
142                     }
143                     if (chunkLength > MAX_UNCOMPRESSED_DATA_SIZE) {
144                         throw new DecompressionException("Received UNCOMPRESSED_DATA larger than 65540 bytes");
145                     }
146 
147                     if (inSize < 4 + chunkLength) {
148                         return;
149                     }
150 
151                     in.skipBytes(4);
152                     if (validateChecksums) {
153                         int checksum = in.readIntLE();
154                         validateChecksum(checksum, in, in.readerIndex(), chunkLength - 4);
155                     } else {
156                         in.skipBytes(4);
157                     }
158                     out.add(in.readRetainedSlice(chunkLength - 4));
159                     break;
160                 case COMPRESSED_DATA:
161                     if (!started) {
162                         throw new DecompressionException("Received COMPRESSED_DATA tag before STREAM_IDENTIFIER");
163                     }
164 
165                     if (inSize < 4 + chunkLength) {
166                         return;
167                     }
168 
169                     in.skipBytes(4);
170                     int checksum = in.readIntLE();
171                     ByteBuf uncompressed = ctx.alloc().buffer();
172                     try {
173                         if (validateChecksums) {
174                             int oldWriterIndex = in.writerIndex();
175                             try {
176                                 in.writerIndex(in.readerIndex() + chunkLength - 4);
177                                 snappy.decode(in, uncompressed);
178                             } finally {
179                                 in.writerIndex(oldWriterIndex);
180                             }
181                             validateChecksum(checksum, uncompressed, 0, uncompressed.writerIndex());
182                         } else {
183                             snappy.decode(in.readSlice(chunkLength - 4), uncompressed);
184                         }
185                         out.add(uncompressed);
186                         uncompressed = null;
187                     } finally {
188                         if (uncompressed != null) {
189                             uncompressed.release();
190                         }
191                     }
192                     snappy.reset();
193                     break;
194             }
195         } catch (Exception e) {
196             corrupted = true;
197             throw e;
198         }
199     }
200 
201     private static void checkByte(byte actual, byte expect) {
202         if (actual != expect) {
203             throw new DecompressionException("Unexpected stream identifier contents. Mismatched snappy " +
204                     "protocol version?");
205         }
206     }
207 
208     /**
209      * Decodes the chunk type from the type tag byte.
210      *
211      * @param type The tag byte extracted from the stream
212      * @return The appropriate {@link ChunkType}, defaulting to {@link ChunkType#RESERVED_UNSKIPPABLE}
213      */
214     private static ChunkType mapChunkType(byte type) {
215         if (type == 0) {
216             return ChunkType.COMPRESSED_DATA;
217         } else if (type == 1) {
218             return ChunkType.UNCOMPRESSED_DATA;
219         } else if (type == (byte) 0xff) {
220             return ChunkType.STREAM_IDENTIFIER;
221         } else if ((type & 0x80) == 0x80) {
222             return ChunkType.RESERVED_SKIPPABLE;
223         } else {
224             return ChunkType.RESERVED_UNSKIPPABLE;
225         }
226     }
227 }