1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty5.handler.codec.compression;
17
18 import io.netty5.buffer.BufferUtil;
19 import io.netty5.buffer.api.Buffer;
20 import io.netty5.buffer.api.BufferAllocator;
21
22 import java.util.function.Supplier;
23
24 import static io.netty5.handler.codec.compression.Snappy.validateChecksum;
25
26
27
28
29
30
31
32
33
34
35
36
37 public final class SnappyDecompressor implements Decompressor {
38 private enum ChunkType {
39 STREAM_IDENTIFIER,
40 COMPRESSED_DATA,
41 UNCOMPRESSED_DATA,
42 RESERVED_UNSKIPPABLE,
43 RESERVED_SKIPPABLE,
44 CORRUPTED,
45 FINISHED,
46 }
47
48 private static final int SNAPPY_IDENTIFIER_LEN = 6;
49
50 private static final int MAX_UNCOMPRESSED_DATA_SIZE = 65536 + 4;
51
52 private static final int MAX_DECOMPRESSED_DATA_SIZE = 65536;
53
54 private static final int MAX_COMPRESSED_CHUNK_SIZE = 16777216 - 1;
55
56 private final Snappy snappy = new Snappy();
57 private final boolean validateChecksums;
58
59 private boolean started;
60 private int numBytesToSkip;
61 private State state = State.DECODING;
62
63 private enum State {
64 DECODING,
65 FINISHED,
66 CORRUPTED,
67 CLOSED
68 }
69
70
71
72
73
74
75
76
77
78
79 private SnappyDecompressor(boolean validateChecksums) {
80 this.validateChecksums = validateChecksums;
81 }
82
83
84
85
86
87
88
89
90 public static Supplier<SnappyDecompressor> newFactory() {
91 return newFactory(false);
92 }
93
94
95
96
97
98
99
100
101
102
103
104 public static Supplier<SnappyDecompressor> newFactory(boolean validateChecksums) {
105 return () -> new SnappyDecompressor(validateChecksums);
106 }
107
108 @Override
109 public Buffer decompress(Buffer in, BufferAllocator allocator)
110 throws DecompressionException {
111 switch (state) {
112 case FINISHED:
113 case CORRUPTED:
114 return allocator.allocate(0);
115 case CLOSED:
116 throw new DecompressionException("Decompressor closed");
117 case DECODING:
118 if (numBytesToSkip != 0) {
119
120 int skipBytes = Math.min(numBytesToSkip, in.readableBytes());
121 in.skipReadableBytes(skipBytes);
122 numBytesToSkip -= skipBytes;
123
124
125 return null;
126 }
127
128 int idx = in.readerOffset();
129 final int inSize = in.readableBytes();
130 if (inSize < 4) {
131
132
133 return null;
134 }
135
136 final int chunkTypeVal = in.getUnsignedByte(idx);
137 final ChunkType chunkType = mapChunkType((byte) chunkTypeVal);
138 final int chunkLength = BufferUtil.reverseUnsignedMedium(in.getUnsignedMedium(idx + 1));
139
140 switch (chunkType) {
141 case STREAM_IDENTIFIER:
142 if (chunkLength != SNAPPY_IDENTIFIER_LEN) {
143 streamCorrupted("Unexpected length of stream identifier: " + chunkLength);
144 }
145
146 if (inSize < 4 + SNAPPY_IDENTIFIER_LEN) {
147 return null;
148 }
149
150 in.skipReadableBytes(4);
151 int offset = in.readerOffset();
152 in.skipReadableBytes(SNAPPY_IDENTIFIER_LEN);
153
154 checkByte(in.getByte(offset++), (byte) 's');
155 checkByte(in.getByte(offset++), (byte) 'N');
156 checkByte(in.getByte(offset++), (byte) 'a');
157 checkByte(in.getByte(offset++), (byte) 'P');
158 checkByte(in.getByte(offset++), (byte) 'p');
159 checkByte(in.getByte(offset), (byte) 'Y');
160
161 started = true;
162 return null;
163 case RESERVED_SKIPPABLE:
164 if (!started) {
165 streamCorrupted("Received RESERVED_SKIPPABLE tag before STREAM_IDENTIFIER");
166 }
167
168 in.skipReadableBytes(4);
169
170 int skipBytes = Math.min(chunkLength, in.readableBytes());
171 in.skipReadableBytes(skipBytes);
172 if (skipBytes != chunkLength) {
173
174
175 numBytesToSkip = chunkLength - skipBytes;
176 }
177 return null;
178 case RESERVED_UNSKIPPABLE:
179
180
181
182 streamCorrupted(
183 "Found reserved unskippable chunk type: 0x" + Integer.toHexString(chunkTypeVal));
184 case UNCOMPRESSED_DATA:
185 if (!started) {
186 streamCorrupted("Received UNCOMPRESSED_DATA tag before STREAM_IDENTIFIER");
187 }
188 if (chunkLength > MAX_UNCOMPRESSED_DATA_SIZE) {
189 streamCorrupted("Received UNCOMPRESSED_DATA larger than " +
190 MAX_UNCOMPRESSED_DATA_SIZE + " bytes");
191 }
192
193 if (inSize < 4 + chunkLength) {
194 return null;
195 }
196
197 in.skipReadableBytes(4);
198 if (validateChecksums) {
199 int checksum = Integer.reverseBytes(in.readInt());
200 try {
201 validateChecksum(checksum, in, in.readerOffset(), chunkLength - 4);
202 } catch (DecompressionException e) {
203 state = State.CORRUPTED;
204 throw e;
205 }
206 } else {
207 in.skipReadableBytes(4);
208 }
209 return in.readSplit(chunkLength - 4);
210 case COMPRESSED_DATA:
211 if (!started) {
212 streamCorrupted("Received COMPRESSED_DATA tag before STREAM_IDENTIFIER");
213 }
214
215 if (chunkLength > MAX_COMPRESSED_CHUNK_SIZE) {
216 streamCorrupted("Received COMPRESSED_DATA that contains" +
217 " chunk that exceeds " + MAX_COMPRESSED_CHUNK_SIZE + " bytes");
218 }
219
220 if (inSize < 4 + chunkLength) {
221 return null;
222 }
223
224 in.skipReadableBytes(4);
225 int checksum = Integer.reverseBytes(in.readInt());
226
227 int uncompressedSize = snappy.getPreamble(in);
228 if (uncompressedSize > MAX_DECOMPRESSED_DATA_SIZE) {
229 streamCorrupted("Received COMPRESSED_DATA that contains" +
230 " uncompressed data that exceeds " + MAX_DECOMPRESSED_DATA_SIZE + " bytes");
231 }
232
233 Buffer uncompressed = allocator.allocate(uncompressedSize);
234 uncompressed.implicitCapacityLimit(MAX_DECOMPRESSED_DATA_SIZE);
235 try {
236 if (validateChecksums) {
237 int oldWriterIndex = in.writerOffset();
238 try {
239 in.writerOffset(in.readerOffset() + chunkLength - 4);
240 snappy.decode(in, uncompressed);
241 } finally {
242 in.writerOffset(oldWriterIndex);
243 }
244 try {
245 validateChecksum(checksum, uncompressed, 0, uncompressed.writerOffset());
246 } catch (DecompressionException e) {
247 state = State.CORRUPTED;
248 throw e;
249 }
250 } else {
251 try (Buffer slice = in.readSplit(chunkLength - 4)) {
252 snappy.decode(slice, uncompressed);
253 }
254 }
255 snappy.reset();
256 Buffer buffer = uncompressed;
257 uncompressed = null;
258 return buffer;
259 } finally {
260 if (uncompressed != null) {
261 uncompressed.close();
262 }
263 }
264 default:
265 streamCorrupted("Unexpected state");
266 return null;
267 }
268 default:
269 throw new IllegalStateException();
270 }
271 }
272
273 private static void checkByte(byte actual, byte expect) {
274 if (actual != expect) {
275 throw new DecompressionException("Unexpected stream identifier contents. Mismatched snappy " +
276 "protocol version?");
277 }
278 }
279
280
281
282
283
284
285
286 private static ChunkType mapChunkType(byte type) {
287 if (type == 0) {
288 return ChunkType.COMPRESSED_DATA;
289 } else if (type == 1) {
290 return ChunkType.UNCOMPRESSED_DATA;
291 } else if (type == (byte) 0xff) {
292 return ChunkType.STREAM_IDENTIFIER;
293 } else if ((type & 0x80) == 0x80) {
294 return ChunkType.RESERVED_SKIPPABLE;
295 } else {
296 return ChunkType.RESERVED_UNSKIPPABLE;
297 }
298 }
299
300 private void streamCorrupted(String message) {
301 state = State.CORRUPTED;
302 throw new DecompressionException(message);
303 }
304
305 @Override
306 public boolean isFinished() {
307 switch (state) {
308 case FINISHED:
309 case CLOSED:
310 case CORRUPTED:
311 return true;
312 default:
313 return false;
314 }
315 }
316
317 @Override
318 public boolean isClosed() {
319 return state == State.CLOSED;
320 }
321
322 @Override
323 public void close() {
324 state = State.FINISHED;
325 }
326 }