View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  
17  package io.netty5.handler.codec.compression;
18  
19  import io.netty5.buffer.api.Buffer;
20  import io.netty5.buffer.api.BufferAllocator;
21  import io.netty5.handler.codec.EncoderException;
22  import io.netty5.util.internal.ObjectUtil;
23  import net.jpountz.lz4.LZ4Compressor;
24  import net.jpountz.lz4.LZ4Exception;
25  import net.jpountz.lz4.LZ4Factory;
26  
27  import java.nio.ByteBuffer;
28  import java.util.function.Supplier;
29  import java.util.zip.Checksum;
30  
31  import static io.netty5.handler.codec.compression.Lz4Constants.BLOCK_TYPE_COMPRESSED;
32  import static io.netty5.handler.codec.compression.Lz4Constants.BLOCK_TYPE_NON_COMPRESSED;
33  import static io.netty5.handler.codec.compression.Lz4Constants.CHECKSUM_OFFSET;
34  import static io.netty5.handler.codec.compression.Lz4Constants.COMPRESSED_LENGTH_OFFSET;
35  import static io.netty5.handler.codec.compression.Lz4Constants.COMPRESSION_LEVEL_BASE;
36  import static io.netty5.handler.codec.compression.Lz4Constants.DECOMPRESSED_LENGTH_OFFSET;
37  import static io.netty5.handler.codec.compression.Lz4Constants.DEFAULT_BLOCK_SIZE;
38  import static io.netty5.handler.codec.compression.Lz4Constants.DEFAULT_SEED;
39  import static io.netty5.handler.codec.compression.Lz4Constants.HEADER_LENGTH;
40  import static io.netty5.handler.codec.compression.Lz4Constants.MAGIC_NUMBER;
41  import static io.netty5.handler.codec.compression.Lz4Constants.MAX_BLOCK_SIZE;
42  import static io.netty5.handler.codec.compression.Lz4Constants.MIN_BLOCK_SIZE;
43  import static io.netty5.handler.codec.compression.Lz4Constants.TOKEN_OFFSET;
44  import static java.util.Objects.requireNonNull;
45  
46  /**
47   * Compresses a {@link Buffer} using the LZ4 format.
48   *
49   * See original <a href="https://github.com/Cyan4973/lz4">LZ4 Github project</a>
50   * and <a href="https://fastcompression.blogspot.ru/2011/05/lz4-explained.html">LZ4 block format</a>
51   * for full description.
52   *
53   * Since the original LZ4 block format does not contains size of compressed block and size of original data
54   * this encoder uses format like <a href="https://github.com/idelpivnitskiy/lz4-java">LZ4 Java</a> library
55   * written by Adrien Grand and approved by Yann Collet (author of original LZ4 library).
56   *
57   *  * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *     * * * * * * * * * *
58   *  * Magic * Token *  Compressed *  Decompressed *  Checksum *  +  *  LZ4 compressed *
59   *  *       *       *    length   *     length    *           *     *      block      *
60   *  * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *     * * * * * * * * * *
61   */
62  public final class Lz4Compressor implements Compressor {
63      static final int DEFAULT_MAX_ENCODE_SIZE = Integer.MAX_VALUE;
64  
65      private final int blockSize;
66  
67      /**
68       * Underlying compressor in use.
69       */
70      private final LZ4Compressor compressor;
71  
72      /**
73       * Underlying checksum calculator in use.
74       */
75      private final BufferChecksum checksum;
76  
77      /**
78       * Compression level of current LZ4 encoder (depends on {@link #blockSize}).
79       */
80      private final int compressionLevel;
81  
82      /**
83       * Maximum size for any buffer to write encoded (compressed) data into.
84       */
85      private final int maxEncodeSize;
86  
87      private enum State {
88          PROCESSING,
89          FINISHED,
90          CLOSED
91      }
92  
93      private State state = State.PROCESSING;
94  
95      /**
96       * Creates the fastest LZ4 compressor factory with default block size (64 KB)
97       * and xxhash hashing for Java, based on Yann Collet's work available at
98       * <a href="https://github.com/Cyan4973/xxHash">Github</a>.
99       */
100     public static Supplier<Lz4Compressor> newFactory() {
101         return newFactory(false);
102     }
103 
104     /**
105      * Creates a new LZ4 compressor factory with high or fast compression, default block size (64 KB)
106      * and xxhash hashing for Java, based on Yann Collet's work available at
107      * <a href="https://github.com/Cyan4973/xxHash">Github</a>.
108      *
109      * @param highCompressor  if {@code true} codec will use compressor which requires more memory
110      *                        and is slower but compresses more efficiently
111      * @return the factory.
112      */
113     public static Supplier<Lz4Compressor> newFactory(boolean highCompressor) {
114         return newFactory(LZ4Factory.fastestInstance(), highCompressor,
115                 DEFAULT_BLOCK_SIZE, new Lz4XXHash32(DEFAULT_SEED));
116     }
117 
118     /**
119      * Creates a new customizable LZ4 compressor factory.
120      *
121      * @param factory         user customizable {@link LZ4Factory} instance
122      *                        which may be JNI bindings to the original C implementation, a pure Java implementation
123      *                        or a Java implementation that uses the {@link sun.misc.Unsafe}
124      * @param highCompressor  if {@code true} codec will use compressor which requires more memory
125      *                        and is slower but compresses more efficiently
126      * @param blockSize       the maximum number of bytes to try to compress at once,
127      *                        must be >= 64 and <= 32 M
128      * @param checksum        the {@link Checksum} instance to use to check data for integrity
129      * @return the factory.
130      */
131     public static Supplier<Lz4Compressor> newFactory(LZ4Factory factory, boolean highCompressor,
132                                                      int blockSize, Checksum checksum) {
133         return newFactory(factory, highCompressor, blockSize, checksum, DEFAULT_MAX_ENCODE_SIZE);
134     }
135 
136     /**
137      * Creates a new customizable LZ4 compressor factory.
138      *
139      * @param factory         user customizable {@link LZ4Factory} instance
140      *                        which may be JNI bindings to the original C implementation, a pure Java implementation
141      *                        or a Java implementation that uses the {@link sun.misc.Unsafe}
142      * @param highCompressor  if {@code true} codec will use compressor which requires more memory
143      *                        and is slower but compresses more efficiently
144      * @param blockSize       the maximum number of bytes to try to compress at once,
145      *                        must be >= 64 and <= 32 M
146      * @param checksum        the {@link Checksum} instance to use to check data for integrity
147      * @param maxEncodeSize   the maximum size for an encode (compressed) buffer
148      * @return the factory.
149      */
150     public static Supplier<Lz4Compressor> newFactory(LZ4Factory factory, boolean highCompressor, int blockSize,
151                                                      Checksum checksum, int maxEncodeSize) {
152         requireNonNull(factory, "factory");
153         requireNonNull(checksum, "checksum");
154         ObjectUtil.checkPositive(blockSize, "blockSize");
155         ObjectUtil.checkPositive(maxEncodeSize, "maxEncodeSize");
156         return () -> new Lz4Compressor(factory, highCompressor, blockSize, checksum, maxEncodeSize);
157     }
158 
159     private Lz4Compressor(LZ4Factory factory, boolean highCompressor, int blockSize,
160                           Checksum checksum, int maxEncodeSize) {
161         compressor = highCompressor ? factory.highCompressor() : factory.fastCompressor();
162         this.checksum = checksum == null ? null : checksum instanceof Lz4XXHash32 ? (Lz4XXHash32) checksum :
163                 new BufferChecksum(checksum);
164 
165         compressionLevel = compressionLevel(blockSize);
166         this.blockSize = blockSize;
167         this.maxEncodeSize = maxEncodeSize;
168     }
169 
170     /**
171      * Calculates compression level on the basis of block size.
172      */
173     private static int compressionLevel(int blockSize) {
174         if (blockSize < MIN_BLOCK_SIZE || blockSize > MAX_BLOCK_SIZE) {
175             throw new IllegalArgumentException(String.format(
176                     "blockSize: %d (expected: %d-%d)", blockSize, MIN_BLOCK_SIZE, MAX_BLOCK_SIZE));
177         }
178         int compressionLevel = 32 - Integer.numberOfLeadingZeros(blockSize - 1); // ceil of log2
179         compressionLevel = Math.max(0, compressionLevel - COMPRESSION_LEVEL_BASE);
180         return compressionLevel;
181     }
182 
183     private Buffer allocateBuffer(BufferAllocator allocator, Buffer msg) {
184         int targetBufSize = 0;
185         int remaining = msg.readableBytes();
186 
187         // quick overflow check
188         if (remaining < 0) {
189             throw new EncoderException("too much data to allocate a buffer for compression");
190         }
191 
192         while (remaining > 0) {
193             int curSize = Math.min(blockSize, remaining);
194             remaining -= curSize;
195             // calculate the total compressed size of the current block (including header) and add to the total
196             targetBufSize += compressor.maxCompressedLength(curSize) + HEADER_LENGTH;
197         }
198 
199         // in addition to just the raw byte count, the headers (HEADER_LENGTH) per block (configured via
200         // #blockSize) will also add to the targetBufSize, and the combination of those would never wrap around
201         // again to be >= 0, this is a good check for the overflow case.
202         if (targetBufSize > maxEncodeSize || 0 > targetBufSize) {
203             throw new EncoderException(String.format("requested encode buffer size (%d bytes) exceeds the maximum " +
204                                                      "allowable size (%d bytes)", targetBufSize, maxEncodeSize));
205         }
206 
207         return allocator.allocate(targetBufSize);
208     }
209 
210     @Override
211     public Buffer compress(Buffer input, BufferAllocator allocator) throws CompressionException {
212         switch (state) {
213             case CLOSED:
214                 throw new CompressionException("Compressor closed");
215             case FINISHED:
216                 return allocator.allocate(0);
217             case PROCESSING:
218                 if (input.readableBytes() == 0) {
219                     return allocator.allocate(0);
220                 }
221 
222                 Buffer out = allocateBuffer(allocator, input);
223                 try {
224                     // We need to compress as long as we have input to read as we are limited by the blockSize that
225                     // is used.
226                     while (input.readableBytes() > 0) {
227                         compressData(input, out);
228                     }
229                 } catch (Throwable cause) {
230                     out.close();
231                     throw cause;
232                 }
233                 return out;
234             default:
235                 throw new IllegalStateException();
236         }
237     }
238 
239     @Override
240     public Buffer finish(BufferAllocator allocator) {
241         switch (state) {
242             case CLOSED:
243                 throw new CompressionException("Compressor closed");
244             case FINISHED:
245             case PROCESSING:
246                 state = State.FINISHED;
247 
248                 final Buffer footer = allocator.allocate(HEADER_LENGTH);
249                 footer.ensureWritable(HEADER_LENGTH);
250                 final int idx = footer.writerOffset();
251                 footer.setLong(idx, MAGIC_NUMBER);
252                 footer.setByte(idx + TOKEN_OFFSET, (byte) (BLOCK_TYPE_NON_COMPRESSED | compressionLevel));
253                 footer.setInt(idx + COMPRESSED_LENGTH_OFFSET, 0);
254                 footer.setInt(idx + DECOMPRESSED_LENGTH_OFFSET, 0);
255                 footer.setInt(idx + CHECKSUM_OFFSET, 0);
256 
257                 footer.skipWritableBytes(HEADER_LENGTH);
258                 return footer;
259             default:
260                 throw new IllegalStateException();
261         }
262     }
263 
264     @Override
265     public boolean isFinished() {
266         return state != State.PROCESSING;
267     }
268 
269     @Override
270     public boolean isClosed() {
271         return state == State.CLOSED;
272     }
273 
274     @Override
275     public void close() {
276         state = State.CLOSED;
277     }
278 
279     /**
280      *
281      * Encodes the input buffer into {@link #blockSize} chunks in the output buffer.
282      */
283     private void compressData(Buffer in, Buffer out) {
284         int inReaderIndex = in.readerOffset();
285         int flushableBytes = Math.min(in.readableBytes(), blockSize);
286         assert flushableBytes > 0;
287         checksum.reset();
288         checksum.update(in, inReaderIndex, flushableBytes);
289         final int check = (int) checksum.getValue();
290 
291         final int bufSize = compressor.maxCompressedLength(flushableBytes) + HEADER_LENGTH;
292         out.ensureWritable(bufSize);
293         final int idx = out.writerOffset();
294         int compressedLength = 0;
295         try {
296             out.skipWritableBytes(HEADER_LENGTH);
297             assert out.countWritableComponents() == 1;
298             int readable = flushableBytes;
299 
300             try (var writableIteration = out.forEachWritable()) {
301                 var writableComponent = writableIteration.first();
302                 try (var readableIteration = in.forEachReadable()) {
303                     for (var readableComponent = readableIteration.first();
304                          readableComponent != null; readableComponent = readableComponent.next()) {
305                         ByteBuffer outNioBuffer = writableComponent.writableBuffer();
306                         int pos = outNioBuffer.position();
307                         ByteBuffer inNioBuffer = readableComponent.readableBuffer();
308                         if (inNioBuffer.remaining() > readable) {
309                             inNioBuffer.limit(inNioBuffer.position() + readable);
310                             compressor.compress(inNioBuffer, outNioBuffer);
311                             compressedLength += outNioBuffer.position() - pos;
312                             break;
313                         } else {
314                             readable -= inNioBuffer.remaining();
315                             compressor.compress(inNioBuffer, outNioBuffer);
316                             compressedLength += outNioBuffer.position() - pos;
317                         }
318                     }
319                 }
320             }
321         } catch (LZ4Exception e) {
322             throw new CompressionException(e);
323         } finally {
324             out.writerOffset(idx);
325         }
326         final int blockType;
327         if (compressedLength >= flushableBytes) {
328             blockType = BLOCK_TYPE_NON_COMPRESSED;
329             compressedLength = flushableBytes;
330             in.copyInto(inReaderIndex, out, idx + HEADER_LENGTH, flushableBytes);
331         } else {
332             blockType = BLOCK_TYPE_COMPRESSED;
333         }
334 
335         out.setLong(idx, MAGIC_NUMBER);
336         out.setByte(idx + TOKEN_OFFSET, (byte) (blockType | compressionLevel));
337         out.setInt(idx + COMPRESSED_LENGTH_OFFSET, Integer.reverseBytes(compressedLength));
338         out.setInt(idx + DECOMPRESSED_LENGTH_OFFSET, Integer.reverseBytes(flushableBytes));
339         out.setInt(idx + CHECKSUM_OFFSET, Integer.reverseBytes(check));
340         out.writerOffset(idx + HEADER_LENGTH + compressedLength);
341 
342         in.readerOffset(inReaderIndex + flushableBytes);
343     }
344 }