View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty5.handler.codec.compression;
17  
18  import io.netty5.buffer.BufferUtil;
19  import io.netty5.buffer.api.Buffer;
20  import io.netty5.buffer.api.BufferAllocator;
21  
22  import java.util.function.Supplier;
23  
24  import static io.netty5.handler.codec.compression.Snappy.validateChecksum;
25  
26  /**
27   * Uncompresses a {@link Buffer} encoded with the Snappy framing format.
28   *
29   * See <a href="https://github.com/google/snappy/blob/master/framing_format.txt">Snappy framing format</a>.
30   *
31   * Note that by default, validation of the checksum header in each chunk is
32   * DISABLED for performance improvements. If performance is less of an issue,
33   * or if you would prefer the safety that checksum validation brings, please
34   * use the {@link #SnappyDecompressor(boolean)} constructor with the argument
35   * set to {@code true}.
36   */
37  public final class SnappyDecompressor implements Decompressor {
38      private enum ChunkType {
39          STREAM_IDENTIFIER,
40          COMPRESSED_DATA,
41          UNCOMPRESSED_DATA,
42          RESERVED_UNSKIPPABLE,
43          RESERVED_SKIPPABLE,
44          CORRUPTED,
45          FINISHED,
46      }
47  
48      private static final int SNAPPY_IDENTIFIER_LEN = 6;
49      // See https://github.com/google/snappy/blob/1.1.9/framing_format.txt#L95
50      private static final int MAX_UNCOMPRESSED_DATA_SIZE = 65536 + 4;
51      // See https://github.com/google/snappy/blob/1.1.9/framing_format.txt#L82
52      private static final int MAX_DECOMPRESSED_DATA_SIZE = 65536;
53      // See https://github.com/google/snappy/blob/1.1.9/framing_format.txt#L82
54      private static final int MAX_COMPRESSED_CHUNK_SIZE = 16777216 - 1;
55  
56      private final Snappy snappy = new Snappy();
57      private final boolean validateChecksums;
58  
59      private boolean started;
60      private int numBytesToSkip;
61      private State state = State.DECODING;
62  
63      private enum State {
64          DECODING,
65          FINISHED,
66          CORRUPTED,
67          CLOSED
68      }
69  
70      /**
71       * Creates a new snappy decompressor with validation of checksums
72       * as specified.
73       *
74       * @param validateChecksums
75       *        If true, the checksum field will be validated against the actual
76       *        uncompressed data, and if the checksums do not match, a suitable
77       *        {@link DecompressionException} will be thrown
78       */
79      private SnappyDecompressor(boolean validateChecksums) {
80          this.validateChecksums = validateChecksums;
81      }
82  
83      /**
84       * Creates a new snappy decompressor factory with validation of checksums
85       * turned OFF. To turn checksum validation on, please use the alternate
86       * {@link #SnappyDecompressor(boolean)} constructor.
87       *
88       * @return the factory.
89       */
90      public static Supplier<SnappyDecompressor> newFactory() {
91          return newFactory(false);
92      }
93  
94      /**
95       * Creates a new snappy decompressor factory with validation of checksums
96       * as specified.
97       *
98       * @param validateChecksums
99       *        If true, the checksum field will be validated against the actual
100      *        uncompressed data, and if the checksums do not match, a suitable
101      *        {@link DecompressionException} will be thrown
102      * @return the factory.
103      */
104     public static Supplier<SnappyDecompressor> newFactory(boolean validateChecksums) {
105         return () -> new SnappyDecompressor(validateChecksums);
106     }
107 
108     @Override
109     public Buffer decompress(Buffer in, BufferAllocator allocator)
110             throws DecompressionException {
111         switch (state) {
112             case FINISHED:
113             case CORRUPTED:
114                 return allocator.allocate(0);
115             case CLOSED:
116                 throw new DecompressionException("Decompressor closed");
117             case DECODING:
118                 if (numBytesToSkip != 0) {
119                     // The last chunkType we detected was RESERVED_SKIPPABLE and we still have some bytes to skip.
120                     int skipBytes = Math.min(numBytesToSkip, in.readableBytes());
121                     in.skipReadableBytes(skipBytes);
122                     numBytesToSkip -= skipBytes;
123 
124                     // Let's return and try again.
125                     return null;
126                 }
127 
128                 int idx = in.readerOffset();
129                 final int inSize = in.readableBytes();
130                 if (inSize < 4) {
131                     // We need to be at least able to read the chunk type identifier (one byte),
132                     // and the length of the chunk (3 bytes) in order to proceed
133                     return null;
134                 }
135 
136                 final int chunkTypeVal = in.getUnsignedByte(idx);
137                 final ChunkType chunkType = mapChunkType((byte) chunkTypeVal);
138                 final int chunkLength = BufferUtil.reverseUnsignedMedium(in.getUnsignedMedium(idx + 1));
139 
140                 switch (chunkType) {
141                     case STREAM_IDENTIFIER:
142                         if (chunkLength != SNAPPY_IDENTIFIER_LEN) {
143                             streamCorrupted("Unexpected length of stream identifier: " + chunkLength);
144                         }
145 
146                         if (inSize < 4 + SNAPPY_IDENTIFIER_LEN) {
147                             return null;
148                         }
149 
150                         in.skipReadableBytes(4);
151                         int offset = in.readerOffset();
152                         in.skipReadableBytes(SNAPPY_IDENTIFIER_LEN);
153 
154                         checkByte(in.getByte(offset++), (byte) 's');
155                         checkByte(in.getByte(offset++), (byte) 'N');
156                         checkByte(in.getByte(offset++), (byte) 'a');
157                         checkByte(in.getByte(offset++), (byte) 'P');
158                         checkByte(in.getByte(offset++), (byte) 'p');
159                         checkByte(in.getByte(offset), (byte) 'Y');
160 
161                         started = true;
162                         return null;
163                     case RESERVED_SKIPPABLE:
164                         if (!started) {
165                             streamCorrupted("Received RESERVED_SKIPPABLE tag before STREAM_IDENTIFIER");
166                         }
167 
168                         in.skipReadableBytes(4);
169 
170                         int skipBytes = Math.min(chunkLength, in.readableBytes());
171                         in.skipReadableBytes(skipBytes);
172                         if (skipBytes != chunkLength) {
173                             // We could skip all bytes, let's store the remaining so we can do so once we receive more
174                             // data.
175                             numBytesToSkip = chunkLength - skipBytes;
176                         }
177                         return null;
178                     case RESERVED_UNSKIPPABLE:
179                         // The spec mandates that reserved unskippable chunks must immediately
180                         // return an error, as we must assume that we cannot decode the stream
181                         // correctly
182                         streamCorrupted(
183                                 "Found reserved unskippable chunk type: 0x" + Integer.toHexString(chunkTypeVal));
184                     case UNCOMPRESSED_DATA:
185                         if (!started) {
186                             streamCorrupted("Received UNCOMPRESSED_DATA tag before STREAM_IDENTIFIER");
187                         }
188                         if (chunkLength > MAX_UNCOMPRESSED_DATA_SIZE) {
189                             streamCorrupted("Received UNCOMPRESSED_DATA larger than " +
190                                     MAX_UNCOMPRESSED_DATA_SIZE + " bytes");
191                         }
192 
193                         if (inSize < 4 + chunkLength) {
194                             return null;
195                         }
196 
197                         in.skipReadableBytes(4);
198                         if (validateChecksums) {
199                             int checksum = Integer.reverseBytes(in.readInt());
200                             try {
201                                 validateChecksum(checksum, in, in.readerOffset(), chunkLength - 4);
202                             } catch (DecompressionException e) {
203                                 state = State.CORRUPTED;
204                                 throw e;
205                             }
206                         } else {
207                             in.skipReadableBytes(4);
208                         }
209                         return in.readSplit(chunkLength - 4);
210                     case COMPRESSED_DATA:
211                         if (!started) {
212                             streamCorrupted("Received COMPRESSED_DATA tag before STREAM_IDENTIFIER");
213                         }
214 
215                         if (chunkLength > MAX_COMPRESSED_CHUNK_SIZE) {
216                             streamCorrupted("Received COMPRESSED_DATA that contains" +
217                                     " chunk that exceeds " + MAX_COMPRESSED_CHUNK_SIZE + " bytes");
218                         }
219 
220                         if (inSize < 4 + chunkLength) {
221                             return null;
222                         }
223 
224                         in.skipReadableBytes(4);
225                         int checksum = Integer.reverseBytes(in.readInt());
226 
227                         int uncompressedSize = snappy.getPreamble(in);
228                         if (uncompressedSize > MAX_DECOMPRESSED_DATA_SIZE) {
229                             streamCorrupted("Received COMPRESSED_DATA that contains" +
230                                     " uncompressed data that exceeds " + MAX_DECOMPRESSED_DATA_SIZE + " bytes");
231                         }
232 
233                         Buffer uncompressed = allocator.allocate(uncompressedSize);
234                         uncompressed.implicitCapacityLimit(MAX_DECOMPRESSED_DATA_SIZE);
235                         try {
236                             if (validateChecksums) {
237                                 int oldWriterIndex = in.writerOffset();
238                                 try {
239                                     in.writerOffset(in.readerOffset() + chunkLength - 4);
240                                     snappy.decode(in, uncompressed);
241                                 } finally {
242                                     in.writerOffset(oldWriterIndex);
243                                 }
244                                 try {
245                                     validateChecksum(checksum, uncompressed, 0, uncompressed.writerOffset());
246                                 } catch (DecompressionException e) {
247                                     state = State.CORRUPTED;
248                                     throw e;
249                                 }
250                             } else {
251                                 try (Buffer slice = in.readSplit(chunkLength - 4)) {
252                                     snappy.decode(slice, uncompressed);
253                                 }
254                             }
255                             snappy.reset();
256                             Buffer buffer = uncompressed;
257                             uncompressed = null;
258                             return buffer;
259                         } finally {
260                             if (uncompressed != null) {
261                                 uncompressed.close();
262                             }
263                         }
264                     default:
265                         streamCorrupted("Unexpected state");
266                         return null;
267                 }
268             default:
269                 throw new IllegalStateException();
270         }
271     }
272 
273     private static void checkByte(byte actual, byte expect) {
274         if (actual != expect) {
275             throw new DecompressionException("Unexpected stream identifier contents. Mismatched snappy " +
276                     "protocol version?");
277         }
278     }
279 
280     /**
281      * Decodes the chunk type from the type tag byte.
282      *
283      * @param type The tag byte extracted from the stream
284      * @return The appropriate {@link ChunkType}, defaulting to {@link ChunkType#RESERVED_UNSKIPPABLE}
285      */
286     private static ChunkType mapChunkType(byte type) {
287         if (type == 0) {
288             return ChunkType.COMPRESSED_DATA;
289         } else if (type == 1) {
290             return ChunkType.UNCOMPRESSED_DATA;
291         } else if (type == (byte) 0xff) {
292             return ChunkType.STREAM_IDENTIFIER;
293         } else if ((type & 0x80) == 0x80) {
294             return ChunkType.RESERVED_SKIPPABLE;
295         } else {
296             return ChunkType.RESERVED_UNSKIPPABLE;
297         }
298     }
299 
300     private void streamCorrupted(String message) {
301         state = State.CORRUPTED;
302         throw new DecompressionException(message);
303     }
304 
305     @Override
306     public boolean isFinished() {
307         switch (state) {
308             case FINISHED:
309             case CLOSED:
310             case CORRUPTED:
311                 return true;
312             default:
313                 return false;
314         }
315     }
316 
317     @Override
318     public boolean isClosed() {
319         return state == State.CLOSED;
320     }
321 
322     @Override
323     public void close() {
324         state = State.FINISHED;
325     }
326 }