1 /*
2 * Copyright 2014 The Netty Project
3 *
4 * The Netty Project licenses this file to you under the Apache License,
5 * version 2.0 (the "License"); you may not use this file except in compliance
6 * with the License. You may obtain a copy of the License at:
7 *
8 * https://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations
14 * under the License.
15 */
16 package io.netty.handler.codec.compression;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.channel.ChannelHandlerContext;
20 import io.netty.handler.codec.ByteToMessageDecoder;
21 import io.netty.util.internal.ObjectUtil;
22 import net.jpountz.lz4.LZ4Exception;
23 import net.jpountz.lz4.LZ4Factory;
24 import net.jpountz.lz4.LZ4FastDecompressor;
25
26 import java.util.List;
27 import java.util.zip.Checksum;
28
29 import static io.netty.handler.codec.compression.Lz4Constants.BLOCK_TYPE_COMPRESSED;
30 import static io.netty.handler.codec.compression.Lz4Constants.BLOCK_TYPE_NON_COMPRESSED;
31 import static io.netty.handler.codec.compression.Lz4Constants.COMPRESSION_LEVEL_BASE;
32 import static io.netty.handler.codec.compression.Lz4Constants.DEFAULT_SEED;
33 import static io.netty.handler.codec.compression.Lz4Constants.HEADER_LENGTH;
34 import static io.netty.handler.codec.compression.Lz4Constants.MAGIC_NUMBER;
35 import static io.netty.handler.codec.compression.Lz4Constants.MAX_BLOCK_SIZE;
36
37 /**
38 * Uncompresses a {@link ByteBuf} encoded with the LZ4 format.
39 *
40 * See original <a href="https://github.com/Cyan4973/lz4">LZ4 Github project</a>
41 * and <a href="https://fastcompression.blogspot.ru/2011/05/lz4-explained.html">LZ4 block format</a>
42 * for full description.
43 *
44 * Since the original LZ4 block format does not contains size of compressed block and size of original data
45 * this encoder uses format like <a href="https://github.com/idelpivnitskiy/lz4-java">LZ4 Java</a> library
46 * written by Adrien Grand and approved by Yann Collet (author of original LZ4 library).
47 *
48 * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
49 * * Magic * Token * Compressed * Decompressed * Checksum * + * LZ4 compressed *
50 * * * * length * length * * * block *
51 * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
52 */
53 public class Lz4FrameDecoder extends ByteToMessageDecoder {
54 private final int maxDecompressedLength;
55 /**
56 * Current state of stream.
57 */
58 private enum State {
59 INIT_BLOCK,
60 DECOMPRESS_DATA,
61 FINISHED,
62 CORRUPTED
63 }
64
65 private State currentState = State.INIT_BLOCK;
66
67 /**
68 * Underlying decompressor in use.
69 */
70 private LZ4FastDecompressor decompressor;
71
72 /**
73 * Underlying checksum calculator in use.
74 */
75 private ByteBufChecksum checksum;
76
77 /**
78 * Type of current block.
79 */
80 private int blockType;
81
82 /**
83 * Compressed length of current incoming block.
84 */
85 private int compressedLength;
86
87 /**
88 * Decompressed length of current incoming block.
89 */
90 private int decompressedLength;
91
92 /**
93 * Checksum value of current incoming block.
94 */
95 private int currentChecksum;
96
97 /**
98 * Creates the fastest LZ4 decoder.
99 *
100 * Note that by default, validation of the checksum header in each chunk is
101 * DISABLED for performance improvements. If performance is less of an issue,
102 * or if you would prefer the safety that checksum validation brings, please
103 * use the {@link #Lz4FrameDecoder(boolean)} constructor with the argument
104 * set to {@code true}.
105 */
106 public Lz4FrameDecoder() {
107 this(false);
108 }
109
110 /**
111 * Creates a LZ4 decoder with fastest decoder instance available on your machine.
112 *
113 * @param validateChecksums if {@code true}, the checksum field will be validated against the actual
114 * uncompressed data, and if the checksums do not match, a suitable
115 * {@link DecompressionException} will be thrown
116 */
117 public Lz4FrameDecoder(boolean validateChecksums) {
118 this(LZ4Factory.fastestInstance(), validateChecksums);
119 }
120
121 /**
122 * Creates a LZ4 decoder with fastest decoder instance available on your machine.
123 *
124 * @param validateChecksums if {@code true}, the checksum field will be validated against the actual
125 * uncompressed data, and if the checksums do not match, a suitable
126 * {@link DecompressionException} will be thrown
127 * @param maxDecompressedLength
128 * maximum length of the decompressed block. If {@code 0} is given it uses {@code 32MB}
129 * by default.
130 */
131 public Lz4FrameDecoder(boolean validateChecksums, int maxDecompressedLength) {
132 this(LZ4Factory.fastestInstance(), validateChecksums ? new Lz4XXHash32(DEFAULT_SEED) : null,
133 maxDecompressedLength);
134 }
135
136 /**
137 * Creates a new LZ4 decoder with customizable implementation.
138 *
139 * @param factory user customizable {@link LZ4Factory} instance
140 * which may be JNI bindings to the original C implementation, a pure Java implementation
141 * or a Java implementation that uses the {@link sun.misc.Unsafe}
142 * @param validateChecksums if {@code true}, the checksum field will be validated against the actual
143 * uncompressed data, and if the checksums do not match, a suitable
144 * {@link DecompressionException} will be thrown. In this case encoder will use
145 * xxhash hashing for Java, based on Yann Collet's work available at
146 * <a href="https://github.com/Cyan4973/xxHash">Github</a>.
147 */
148 public Lz4FrameDecoder(LZ4Factory factory, boolean validateChecksums) {
149 this(factory, validateChecksums ? new Lz4XXHash32(DEFAULT_SEED) : null);
150 }
151
152 /**
153 * Creates a new customizable LZ4 decoder.
154 *
155 * @param factory user customizable {@link LZ4Factory} instance
156 * which may be JNI bindings to the original C implementation, a pure Java implementation
157 * or a Java implementation that uses the {@link sun.misc.Unsafe}
158 * @param checksum the {@link Checksum} instance to use to check data for integrity.
159 * You may set {@code null} if you do not want to validate checksum of each block
160 */
161 public Lz4FrameDecoder(LZ4Factory factory, Checksum checksum) {
162 this(factory, checksum, MAX_BLOCK_SIZE);
163 }
164
165 /**
166 * Creates a new customizable LZ4 decoder.
167 *
168 * @param factory user customizable {@link LZ4Factory} instance
169 * which may be JNI bindings to the original C implementation, a pure Java implementation
170 * or a Java implementation that uses the {@link sun.misc.Unsafe}
171 * @param checksum the {@link Checksum} instance to use to check data for integrity.
172 * You may set {@code null} if you do not want to validate checksum of each block
173 * @param maxDecompressedLength
174 * maximum length of the decompressed block. If {@code 0} is given it uses {@code 32MB} by default.
175 */
176 public Lz4FrameDecoder(LZ4Factory factory, Checksum checksum, int maxDecompressedLength) {
177 decompressor = ObjectUtil.checkNotNull(factory, "factory").fastDecompressor();
178 this.checksum = checksum == null ? null : ByteBufChecksum.wrapChecksum(checksum);
179 this.maxDecompressedLength = maxDecompressedLength == 0 ? MAX_BLOCK_SIZE :
180 ObjectUtil.checkInRange(maxDecompressedLength, 0, MAX_BLOCK_SIZE, "maxDecompressedLength");
181 }
182
183 @Override
184 protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
185 try {
186 switch (currentState) {
187 case INIT_BLOCK:
188 if (in.readableBytes() < HEADER_LENGTH) {
189 break;
190 }
191 final long magic = in.readLong();
192 if (magic != MAGIC_NUMBER) {
193 throw new DecompressionException("unexpected block identifier");
194 }
195
196 final int token = in.readByte();
197 final int compressionLevel = (token & 0x0F) + COMPRESSION_LEVEL_BASE;
198 int blockType = token & 0xF0;
199
200 int compressedLength = Integer.reverseBytes(in.readInt());
201 if (compressedLength < 0 || compressedLength > MAX_BLOCK_SIZE) {
202 throw new DecompressionException(String.format(
203 "invalid compressedLength: %d (expected: 0-%d)",
204 compressedLength, MAX_BLOCK_SIZE));
205 }
206
207 int decompressedLength = Integer.reverseBytes(in.readInt());
208 if (decompressedLength > maxDecompressedLength) {
209 throw new DecompressionException(String.format(
210 "decompressedLength too large: %d (expected: 0-%d)",
211 decompressedLength, maxDecompressedLength));
212 }
213
214 final int maxLocalDecompressedLength = 1 << compressionLevel;
215 if (decompressedLength < 0 || decompressedLength > maxLocalDecompressedLength) {
216 throw new DecompressionException(String.format(
217 "invalid decompressedLength: %d (expected: 0-%d)",
218 decompressedLength, maxLocalDecompressedLength));
219 }
220 if (decompressedLength == 0 && compressedLength != 0
221 || decompressedLength != 0 && compressedLength == 0
222 || blockType == BLOCK_TYPE_NON_COMPRESSED && decompressedLength != compressedLength) {
223 throw new DecompressionException(String.format(
224 "stream corrupted: compressedLength(%d) and decompressedLength(%d) mismatch",
225 compressedLength, decompressedLength));
226 }
227
228 int currentChecksum = Integer.reverseBytes(in.readInt());
229 if (decompressedLength == 0 && compressedLength == 0) {
230 if (currentChecksum != 0) {
231 throw new DecompressionException("stream corrupted: checksum error");
232 }
233 currentState = State.FINISHED;
234 decompressor = null;
235 checksum = null;
236 break;
237 }
238
239 this.blockType = blockType;
240 this.compressedLength = compressedLength;
241 this.decompressedLength = decompressedLength;
242 this.currentChecksum = currentChecksum;
243
244 currentState = State.DECOMPRESS_DATA;
245 // fall through
246 case DECOMPRESS_DATA:
247 blockType = this.blockType;
248 compressedLength = this.compressedLength;
249 decompressedLength = this.decompressedLength;
250 currentChecksum = this.currentChecksum;
251
252 if (in.readableBytes() < compressedLength) {
253 break;
254 }
255
256 final ByteBufChecksum checksum = this.checksum;
257 ByteBuf uncompressed = null;
258
259 try {
260 switch (blockType) {
261 case BLOCK_TYPE_NON_COMPRESSED:
262 // Just pass through, we not update the readerIndex yet as we do this outside of the
263 // switch statement.
264 uncompressed = in.retainedSlice(in.readerIndex(), decompressedLength);
265 break;
266 case BLOCK_TYPE_COMPRESSED:
267 uncompressed = ctx.alloc().buffer(decompressedLength, decompressedLength);
268
269 decompressor.decompress(CompressionUtil.safeReadableNioBuffer(in),
270 uncompressed.internalNioBuffer(uncompressed.writerIndex(), decompressedLength));
271 // Update the writerIndex now to reflect what we decompressed.
272 uncompressed.writerIndex(uncompressed.writerIndex() + decompressedLength);
273 break;
274 default:
275 throw new DecompressionException(String.format(
276 "unexpected blockType: %d (expected: %d or %d)",
277 blockType, BLOCK_TYPE_NON_COMPRESSED, BLOCK_TYPE_COMPRESSED));
278 }
279 // Skip inbound bytes after we processed them.
280 in.skipBytes(compressedLength);
281
282 if (checksum != null) {
283 CompressionUtil.checkChecksum(checksum, uncompressed, currentChecksum);
284 }
285 out.add(uncompressed);
286 uncompressed = null;
287 currentState = State.INIT_BLOCK;
288 } catch (LZ4Exception e) {
289 throw new DecompressionException(e);
290 } finally {
291 if (uncompressed != null) {
292 uncompressed.release();
293 }
294 }
295 break;
296 case FINISHED:
297 case CORRUPTED:
298 in.skipBytes(in.readableBytes());
299 break;
300 default:
301 throw new IllegalStateException();
302 }
303 } catch (Exception e) {
304 currentState = State.CORRUPTED;
305 throw e;
306 }
307 }
308
309 /**
310 * Returns {@code true} if and only if the end of the compressed stream
311 * has been reached.
312 */
313 public boolean isClosed() {
314 return currentState == State.FINISHED;
315 }
316 }