View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty5.handler.codec.compression;
17  
18  import io.netty5.buffer.api.Buffer;
19  import io.netty5.buffer.api.BufferAllocator;
20  import net.jpountz.lz4.LZ4Exception;
21  import net.jpountz.lz4.LZ4Factory;
22  import net.jpountz.lz4.LZ4FastDecompressor;
23  
24  import java.util.function.Supplier;
25  import java.util.zip.Checksum;
26  
27  import static io.netty5.handler.codec.compression.Lz4Constants.BLOCK_TYPE_COMPRESSED;
28  import static io.netty5.handler.codec.compression.Lz4Constants.BLOCK_TYPE_NON_COMPRESSED;
29  import static io.netty5.handler.codec.compression.Lz4Constants.COMPRESSION_LEVEL_BASE;
30  import static io.netty5.handler.codec.compression.Lz4Constants.DEFAULT_SEED;
31  import static io.netty5.handler.codec.compression.Lz4Constants.HEADER_LENGTH;
32  import static io.netty5.handler.codec.compression.Lz4Constants.MAGIC_NUMBER;
33  import static io.netty5.handler.codec.compression.Lz4Constants.MAX_BLOCK_SIZE;
34  import static java.util.Objects.requireNonNull;
35  
36  /**
37   * Uncompresses a {@link Buffer} encoded with the LZ4 format.
38   *
39   * See original <a href="https://github.com/Cyan4973/lz4">LZ4 Github project</a>
40   * and <a href="https://fastcompression.blogspot.ru/2011/05/lz4-explained.html">LZ4 block format</a>
41   * for full description.
42   *
43   * Since the original LZ4 block format does not contains size of compressed block and size of original data
44   * this encoder uses format like <a href="https://github.com/idelpivnitskiy/lz4-java">LZ4 Java</a> library
45   * written by Adrien Grand and approved by Yann Collet (author of original LZ4 library).
46   *
47   *  * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *     * * * * * * * * * *
48   *  * Magic * Token *  Compressed *  Decompressed *  Checksum *  +  *  LZ4 compressed *
49   *  *       *       *    length   *     length    *           *     *      block      *
50   *  * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *     * * * * * * * * * *
51   */
52  public final class Lz4Decompressor implements Decompressor {
53      /**
54       * Current state of stream.
55       */
56      private enum State {
57          INIT_BLOCK,
58          DECOMPRESS_DATA,
59          FINISHED,
60          CORRUPTED,
61          CLOSED
62      }
63  
64      private State currentState = State.INIT_BLOCK;
65  
66      /**
67       * Underlying decompressor in use.
68       */
69      private LZ4FastDecompressor decompressor;
70  
71      /**
72       * Underlying checksum calculator in use.
73       */
74      private BufferChecksum checksum;
75  
76      /**
77       * Type of current block.
78       */
79      private int blockType;
80  
81      /**
82       * Compressed length of current incoming block.
83       */
84      private int compressedLength;
85  
86      /**
87       * Decompressed length of current incoming block.
88       */
89      private int decompressedLength;
90  
91      /**
92       * Checksum value of current incoming block.
93       */
94      private int currentChecksum;
95  
96      /**
97       * Creates a new customizable LZ4 decompressor factory.
98       *
99       * @param factory   user customizable {@link LZ4Factory} instance
100      *                  which may be JNI bindings to the original C implementation, a pure Java implementation
101      *                  or a Java implementation that uses the {@link sun.misc.Unsafe}
102      * @param checksum  the {@link Checksum} instance to use to check data for integrity.
103      *                  You may set {@code null} if you do not want to validate checksum of each block
104      */
105     private Lz4Decompressor(LZ4Factory factory, Checksum checksum) {
106         decompressor = factory.fastDecompressor();
107         this.checksum = checksum == null ? null : checksum instanceof Lz4XXHash32 ? (Lz4XXHash32) checksum :
108                 new BufferChecksum(checksum);
109     }
110 
111     /**
112      * Creates the fastest LZ4 decompressor factory.
113      *
114      * Note that by default, validation of the checksum header in each chunk is
115      * DISABLED for performance improvements. If performance is less of an issue,
116      * or if you would prefer the safety that checksum validation brings, please
117      * use the {@link #newFactory(boolean)} constructor with the argument
118      * set to {@code true}.
119      *
120      * @return the factory.
121      */
122     public static Supplier<Lz4Decompressor> newFactory() {
123         return newFactory(false);
124     }
125 
126     /**
127      * Creates a LZ4 decompressor factory with fastest decoder instance available on your machine.
128      *
129      * @param validateChecksums  if {@code true}, the checksum field will be validated against the actual
130      *                           uncompressed data, and if the checksums do not match, a suitable
131      *                           {@link DecompressionException} will be thrown
132      * @return the factory.
133      */
134     public static Supplier<Lz4Decompressor> newFactory(boolean validateChecksums) {
135         return newFactory(LZ4Factory.fastestInstance(), validateChecksums);
136     }
137 
138     /**
139      * Creates a LZ4 decompressor factory with customizable implementation.
140      *
141      * @param factory            user customizable {@link LZ4Factory} instance
142      *                           which may be JNI bindings to the original C implementation, a pure Java implementation
143      *                           or a Java implementation that uses the {@link sun.misc.Unsafe}
144      * @param validateChecksums  if {@code true}, the checksum field will be validated against the actual
145      *                           uncompressed data, and if the checksums do not match, a suitable
146      *                           {@link DecompressionException} will be thrown. In this case encoder will use
147      *                           xxhash hashing for Java, based on Yann Collet's work available at
148      *                           <a href="https://github.com/Cyan4973/xxHash">Github</a>.
149      * @return the factory.
150      */
151     public static Supplier<Lz4Decompressor> newFactory(LZ4Factory factory, boolean validateChecksums) {
152         return newFactory(factory, validateChecksums ? new Lz4XXHash32(DEFAULT_SEED) : null);
153     }
154 
155     /**
156      * Creates a customizable LZ4 decompressor factory.
157      *
158      * @param factory   user customizable {@link LZ4Factory} instance
159      *                  which may be JNI bindings to the original C implementation, a pure Java implementation
160      *                  or a Java implementation that uses the {@link sun.misc.Unsafe}
161      * @param checksum  the {@link Checksum} instance to use to check data for integrity.
162      *                  You may set {@code null} if you do not want to validate checksum of each block
163      * @return the factory.
164      */
165     public static Supplier<Lz4Decompressor> newFactory(LZ4Factory factory, Checksum checksum) {
166         requireNonNull(factory, "factory");
167         return () -> new Lz4Decompressor(factory, checksum);
168     }
169 
170     private void decompress(Buffer compressed, Buffer uncompressed) {
171         assert compressed.countReadableComponents() == 1;
172         try (var writableIteration = uncompressed.forEachWritable()) {
173             var writableComponent = writableIteration.first();
174             try (var readableIteration = compressed.forEachReadable()) {
175                 var readableComponent = readableIteration.first();
176                 decompressor.decompress(
177                         readableComponent.readableBuffer(), writableComponent.writableBuffer());
178             }
179         }
180     }
181 
182     @Override
183     public Buffer decompress(Buffer in, BufferAllocator allocator) throws DecompressionException {
184         try {
185             switch (currentState) {
186                 case CORRUPTED:
187                 case FINISHED:
188                     return allocator.allocate(0);
189                 case CLOSED:
190                     throw new DecompressionException("Decompressor closed");
191                 case INIT_BLOCK:
192                     if (in.readableBytes() < HEADER_LENGTH) {
193                         return null;
194                     }
195                     final long magic = in.readLong();
196                     if (magic != MAGIC_NUMBER) {
197                         streamCorrupted("unexpected block identifier");
198                     }
199 
200                     final int token = in.readByte();
201                     final int compressionLevel = (token & 0x0F) + COMPRESSION_LEVEL_BASE;
202                     int blockType = token & 0xF0;
203 
204                     int compressedLength = Integer.reverseBytes(in.readInt());
205                     if (compressedLength < 0 || compressedLength > MAX_BLOCK_SIZE) {
206                         streamCorrupted(String.format(
207                                 "invalid compressedLength: %d (expected: 0-%d)",
208                                 compressedLength, MAX_BLOCK_SIZE));
209                     }
210 
211                     int decompressedLength = Integer.reverseBytes(in.readInt());
212                     final int maxDecompressedLength = 1 << compressionLevel;
213                     if (decompressedLength < 0 || decompressedLength > maxDecompressedLength) {
214                         streamCorrupted(String.format(
215                                 "invalid decompressedLength: %d (expected: 0-%d)",
216                                 decompressedLength, maxDecompressedLength));
217                     }
218                     if (decompressedLength == 0 && compressedLength != 0
219                             || decompressedLength != 0 && compressedLength == 0
220                             || blockType == BLOCK_TYPE_NON_COMPRESSED && decompressedLength != compressedLength) {
221                         streamCorrupted(String.format(
222                                 "stream corrupted: compressedLength(%d) and decompressedLength(%d) mismatch",
223                                 compressedLength, decompressedLength));
224                     }
225 
226                     int currentChecksum = Integer.reverseBytes(in.readInt());
227                     if (decompressedLength == 0 && compressedLength == 0) {
228                         if (currentChecksum != 0) {
229                             streamCorrupted("stream corrupted: checksum error");
230                         }
231                         currentState = State.FINISHED;
232                         decompressor = null;
233                         checksum = null;
234                         return null;
235                     }
236 
237                     this.blockType = blockType;
238                     this.compressedLength = compressedLength;
239                     this.decompressedLength = decompressedLength;
240                     this.currentChecksum = currentChecksum;
241 
242                     currentState = State.DECOMPRESS_DATA;
243                     // fall through
244                 case DECOMPRESS_DATA:
245                     blockType = this.blockType;
246                     compressedLength = this.compressedLength;
247                     decompressedLength = this.decompressedLength;
248                     currentChecksum = this.currentChecksum;
249 
250                     if (in.readableBytes() < compressedLength) {
251                         return null;
252                     }
253 
254                     final BufferChecksum checksum = this.checksum;
255                     Buffer uncompressed = null;
256 
257                     try {
258                         switch (blockType) {
259                             case BLOCK_TYPE_NON_COMPRESSED:
260                                 // Just pass through.
261                                 assert compressedLength == decompressedLength;
262                                 uncompressed = in.readSplit(decompressedLength);
263                                 break;
264                             case BLOCK_TYPE_COMPRESSED:
265                                 uncompressed = allocator.allocate(decompressedLength);
266 
267                                 assert uncompressed.countWritableComponents() == 1;
268                                 if (in.countReadableComponents() > 1) {
269                                     // Lz4 needs to get the compressed data to be feed in one buffer so we need to do
270                                     // a memory copy.
271                                     try (Buffer inBuffer = allocator.allocate(compressedLength)) {
272                                         in.copyInto(in.readerOffset(), inBuffer,
273                                                 inBuffer.writerOffset(), compressedLength);
274                                         inBuffer.skipWritableBytes(compressedLength);
275                                         decompress(inBuffer, uncompressed);
276                                     }
277                                 } else {
278                                     decompress(in, uncompressed);
279                                 }
280                                 in.skipReadableBytes(compressedLength);
281                                 // Update the writerIndex now to reflect what we decompressed.
282                                 uncompressed.skipWritableBytes(decompressedLength);
283                                 break;
284                             default:
285                                 streamCorrupted(String.format(
286                                         "unexpected blockType: %d (expected: %d or %d)",
287                                         blockType, BLOCK_TYPE_NON_COMPRESSED, BLOCK_TYPE_COMPRESSED));
288                         }
289 
290                         if (checksum != null) {
291                             CompressionUtil.checkChecksum(checksum, uncompressed, currentChecksum);
292                         }
293                         Buffer buffer = uncompressed;
294                         uncompressed = null;
295                         currentState = State.INIT_BLOCK;
296                         return buffer;
297                     } catch (LZ4Exception e) {
298                         streamCorrupted(e);
299                     } finally {
300                         if (uncompressed != null) {
301                             uncompressed.close();
302                         }
303                     }
304                 default:
305                     throw new IllegalStateException();
306             }
307         } catch (Exception e) {
308             currentState = State.CORRUPTED;
309             throw e;
310         }
311     }
312 
313     @Override
314     public boolean isFinished() {
315         switch (currentState) {
316             case FINISHED:
317             case CLOSED:
318             case CORRUPTED:
319                 return true;
320             default:
321                 return false;
322         }
323     }
324 
325     @Override
326     public boolean isClosed() {
327         return currentState == State.CLOSED;
328     }
329 
330     @Override
331     public void close() {
332         currentState = State.CLOSED;
333     }
334 
335     private void streamCorrupted(String message) {
336         currentState = State.CORRUPTED;
337         throw new DecompressionException(message);
338     }
339 
340     private void streamCorrupted(Exception cause) {
341         currentState = State.CORRUPTED;
342         throw new DecompressionException(cause);
343     }
344 }