1 /*
2 * Copyright 2012 The Netty Project
3 *
4 * The Netty Project licenses this file to you under the Apache License,
5 * version 2.0 (the "License"); you may not use this file except in compliance
6 * with the License. You may obtain a copy of the License at:
7 *
8 * https://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations
14 * under the License.
15 */
16 package io.netty5.handler.codec.compression;
17
18 import io.netty5.buffer.BufferUtil;
19 import io.netty5.buffer.api.Buffer;
20 import io.netty5.buffer.api.BufferAllocator;
21
22 import java.util.function.Supplier;
23
24 import static io.netty5.handler.codec.compression.Snappy.validateChecksum;
25
26 /**
27 * Uncompresses a {@link Buffer} encoded with the Snappy framing format.
28 *
29 * See <a href="https://github.com/google/snappy/blob/master/framing_format.txt">Snappy framing format</a>.
30 *
31 * Note that by default, validation of the checksum header in each chunk is
32 * DISABLED for performance improvements. If performance is less of an issue,
33 * or if you would prefer the safety that checksum validation brings, please
34 * use the {@link #SnappyDecompressor(boolean)} constructor with the argument
35 * set to {@code true}.
36 */
37 public final class SnappyDecompressor implements Decompressor {
38 private enum ChunkType {
39 STREAM_IDENTIFIER,
40 COMPRESSED_DATA,
41 UNCOMPRESSED_DATA,
42 RESERVED_UNSKIPPABLE,
43 RESERVED_SKIPPABLE,
44 CORRUPTED,
45 FINISHED,
46 }
47
48 private static final int SNAPPY_IDENTIFIER_LEN = 6;
49 // See https://github.com/google/snappy/blob/1.1.9/framing_format.txt#L95
50 private static final int MAX_UNCOMPRESSED_DATA_SIZE = 65536 + 4;
51 // See https://github.com/google/snappy/blob/1.1.9/framing_format.txt#L82
52 private static final int MAX_DECOMPRESSED_DATA_SIZE = 65536;
53 // See https://github.com/google/snappy/blob/1.1.9/framing_format.txt#L82
54 private static final int MAX_COMPRESSED_CHUNK_SIZE = 16777216 - 1;
55
56 private final Snappy snappy = new Snappy();
57 private final boolean validateChecksums;
58
59 private boolean started;
60 private int numBytesToSkip;
61 private State state = State.DECODING;
62
63 private enum State {
64 DECODING,
65 FINISHED,
66 CORRUPTED,
67 CLOSED
68 }
69
70 /**
71 * Creates a new snappy decompressor with validation of checksums
72 * as specified.
73 *
74 * @param validateChecksums
75 * If true, the checksum field will be validated against the actual
76 * uncompressed data, and if the checksums do not match, a suitable
77 * {@link DecompressionException} will be thrown
78 */
79 private SnappyDecompressor(boolean validateChecksums) {
80 this.validateChecksums = validateChecksums;
81 }
82
83 /**
84 * Creates a new snappy decompressor factory with validation of checksums
85 * turned OFF. To turn checksum validation on, please use the alternate
86 * {@link #SnappyDecompressor(boolean)} constructor.
87 *
88 * @return the factory.
89 */
90 public static Supplier<SnappyDecompressor> newFactory() {
91 return newFactory(false);
92 }
93
94 /**
95 * Creates a new snappy decompressor factory with validation of checksums
96 * as specified.
97 *
98 * @param validateChecksums
99 * If true, the checksum field will be validated against the actual
100 * uncompressed data, and if the checksums do not match, a suitable
101 * {@link DecompressionException} will be thrown
102 * @return the factory.
103 */
104 public static Supplier<SnappyDecompressor> newFactory(boolean validateChecksums) {
105 return () -> new SnappyDecompressor(validateChecksums);
106 }
107
108 @Override
109 public Buffer decompress(Buffer in, BufferAllocator allocator)
110 throws DecompressionException {
111 switch (state) {
112 case FINISHED:
113 case CORRUPTED:
114 return allocator.allocate(0);
115 case CLOSED:
116 throw new DecompressionException("Decompressor closed");
117 case DECODING:
118 if (numBytesToSkip != 0) {
119 // The last chunkType we detected was RESERVED_SKIPPABLE and we still have some bytes to skip.
120 int skipBytes = Math.min(numBytesToSkip, in.readableBytes());
121 in.skipReadableBytes(skipBytes);
122 numBytesToSkip -= skipBytes;
123
124 // Let's return and try again.
125 return null;
126 }
127
128 int idx = in.readerOffset();
129 final int inSize = in.readableBytes();
130 if (inSize < 4) {
131 // We need to be at least able to read the chunk type identifier (one byte),
132 // and the length of the chunk (3 bytes) in order to proceed
133 return null;
134 }
135
136 final int chunkTypeVal = in.getUnsignedByte(idx);
137 final ChunkType chunkType = mapChunkType((byte) chunkTypeVal);
138 final int chunkLength = BufferUtil.reverseUnsignedMedium(in.getUnsignedMedium(idx + 1));
139
140 switch (chunkType) {
141 case STREAM_IDENTIFIER:
142 if (chunkLength != SNAPPY_IDENTIFIER_LEN) {
143 streamCorrupted("Unexpected length of stream identifier: " + chunkLength);
144 }
145
146 if (inSize < 4 + SNAPPY_IDENTIFIER_LEN) {
147 return null;
148 }
149
150 in.skipReadableBytes(4);
151 int offset = in.readerOffset();
152 in.skipReadableBytes(SNAPPY_IDENTIFIER_LEN);
153
154 checkByte(in.getByte(offset++), (byte) 's');
155 checkByte(in.getByte(offset++), (byte) 'N');
156 checkByte(in.getByte(offset++), (byte) 'a');
157 checkByte(in.getByte(offset++), (byte) 'P');
158 checkByte(in.getByte(offset++), (byte) 'p');
159 checkByte(in.getByte(offset), (byte) 'Y');
160
161 started = true;
162 return null;
163 case RESERVED_SKIPPABLE:
164 if (!started) {
165 streamCorrupted("Received RESERVED_SKIPPABLE tag before STREAM_IDENTIFIER");
166 }
167
168 in.skipReadableBytes(4);
169
170 int skipBytes = Math.min(chunkLength, in.readableBytes());
171 in.skipReadableBytes(skipBytes);
172 if (skipBytes != chunkLength) {
173 // We could skip all bytes, let's store the remaining so we can do so once we receive more
174 // data.
175 numBytesToSkip = chunkLength - skipBytes;
176 }
177 return null;
178 case RESERVED_UNSKIPPABLE:
179 // The spec mandates that reserved unskippable chunks must immediately
180 // return an error, as we must assume that we cannot decode the stream
181 // correctly
182 streamCorrupted(
183 "Found reserved unskippable chunk type: 0x" + Integer.toHexString(chunkTypeVal));
184 case UNCOMPRESSED_DATA:
185 if (!started) {
186 streamCorrupted("Received UNCOMPRESSED_DATA tag before STREAM_IDENTIFIER");
187 }
188 if (chunkLength > MAX_UNCOMPRESSED_DATA_SIZE) {
189 streamCorrupted("Received UNCOMPRESSED_DATA larger than " +
190 MAX_UNCOMPRESSED_DATA_SIZE + " bytes");
191 }
192
193 if (inSize < 4 + chunkLength) {
194 return null;
195 }
196
197 in.skipReadableBytes(4);
198 if (validateChecksums) {
199 int checksum = Integer.reverseBytes(in.readInt());
200 try {
201 validateChecksum(checksum, in, in.readerOffset(), chunkLength - 4);
202 } catch (DecompressionException e) {
203 state = State.CORRUPTED;
204 throw e;
205 }
206 } else {
207 in.skipReadableBytes(4);
208 }
209 return in.readSplit(chunkLength - 4);
210 case COMPRESSED_DATA:
211 if (!started) {
212 streamCorrupted("Received COMPRESSED_DATA tag before STREAM_IDENTIFIER");
213 }
214
215 if (chunkLength > MAX_COMPRESSED_CHUNK_SIZE) {
216 streamCorrupted("Received COMPRESSED_DATA that contains" +
217 " chunk that exceeds " + MAX_COMPRESSED_CHUNK_SIZE + " bytes");
218 }
219
220 if (inSize < 4 + chunkLength) {
221 return null;
222 }
223
224 in.skipReadableBytes(4);
225 int checksum = Integer.reverseBytes(in.readInt());
226
227 int uncompressedSize = snappy.getPreamble(in);
228 if (uncompressedSize > MAX_DECOMPRESSED_DATA_SIZE) {
229 streamCorrupted("Received COMPRESSED_DATA that contains" +
230 " uncompressed data that exceeds " + MAX_DECOMPRESSED_DATA_SIZE + " bytes");
231 }
232
233 Buffer uncompressed = allocator.allocate(uncompressedSize);
234 uncompressed.implicitCapacityLimit(MAX_DECOMPRESSED_DATA_SIZE);
235 try {
236 if (validateChecksums) {
237 int oldWriterIndex = in.writerOffset();
238 try {
239 in.writerOffset(in.readerOffset() + chunkLength - 4);
240 snappy.decode(in, uncompressed);
241 } finally {
242 in.writerOffset(oldWriterIndex);
243 }
244 try {
245 validateChecksum(checksum, uncompressed, 0, uncompressed.writerOffset());
246 } catch (DecompressionException e) {
247 state = State.CORRUPTED;
248 throw e;
249 }
250 } else {
251 try (Buffer slice = in.readSplit(chunkLength - 4)) {
252 snappy.decode(slice, uncompressed);
253 }
254 }
255 snappy.reset();
256 Buffer buffer = uncompressed;
257 uncompressed = null;
258 return buffer;
259 } finally {
260 if (uncompressed != null) {
261 uncompressed.close();
262 }
263 }
264 default:
265 streamCorrupted("Unexpected state");
266 return null;
267 }
268 default:
269 throw new IllegalStateException();
270 }
271 }
272
273 private static void checkByte(byte actual, byte expect) {
274 if (actual != expect) {
275 throw new DecompressionException("Unexpected stream identifier contents. Mismatched snappy " +
276 "protocol version?");
277 }
278 }
279
280 /**
281 * Decodes the chunk type from the type tag byte.
282 *
283 * @param type The tag byte extracted from the stream
284 * @return The appropriate {@link ChunkType}, defaulting to {@link ChunkType#RESERVED_UNSKIPPABLE}
285 */
286 private static ChunkType mapChunkType(byte type) {
287 if (type == 0) {
288 return ChunkType.COMPRESSED_DATA;
289 } else if (type == 1) {
290 return ChunkType.UNCOMPRESSED_DATA;
291 } else if (type == (byte) 0xff) {
292 return ChunkType.STREAM_IDENTIFIER;
293 } else if ((type & 0x80) == 0x80) {
294 return ChunkType.RESERVED_SKIPPABLE;
295 } else {
296 return ChunkType.RESERVED_UNSKIPPABLE;
297 }
298 }
299
300 private void streamCorrupted(String message) {
301 state = State.CORRUPTED;
302 throw new DecompressionException(message);
303 }
304
305 @Override
306 public boolean isFinished() {
307 switch (state) {
308 case FINISHED:
309 case CLOSED:
310 case CORRUPTED:
311 return true;
312 default:
313 return false;
314 }
315 }
316
317 @Override
318 public boolean isClosed() {
319 return state == State.CLOSED;
320 }
321
322 @Override
323 public void close() {
324 state = State.FINISHED;
325 }
326 }