View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  
17  package io.netty.handler.codec.compression;
18  
19  import io.netty.buffer.ByteBuf;
20  import io.netty.buffer.Unpooled;
21  import io.netty.channel.ChannelFuture;
22  import io.netty.channel.ChannelHandlerContext;
23  import io.netty.channel.ChannelPipeline;
24  import io.netty.channel.ChannelPromise;
25  import io.netty.handler.codec.EncoderException;
26  import io.netty.handler.codec.MessageToByteEncoder;
27  import io.netty.util.concurrent.EventExecutor;
28  import io.netty.util.concurrent.PromiseNotifier;
29  import io.netty.util.internal.ObjectUtil;
30  import net.jpountz.lz4.LZ4Compressor;
31  import net.jpountz.lz4.LZ4Exception;
32  import net.jpountz.lz4.LZ4Factory;
33  
34  import java.nio.ByteBuffer;
35  import java.util.zip.Checksum;
36  
37  import static io.netty.handler.codec.compression.Lz4Constants.BLOCK_TYPE_COMPRESSED;
38  import static io.netty.handler.codec.compression.Lz4Constants.BLOCK_TYPE_NON_COMPRESSED;
39  import static io.netty.handler.codec.compression.Lz4Constants.CHECKSUM_OFFSET;
40  import static io.netty.handler.codec.compression.Lz4Constants.COMPRESSED_LENGTH_OFFSET;
41  import static io.netty.handler.codec.compression.Lz4Constants.COMPRESSION_LEVEL_BASE;
42  import static io.netty.handler.codec.compression.Lz4Constants.DECOMPRESSED_LENGTH_OFFSET;
43  import static io.netty.handler.codec.compression.Lz4Constants.DEFAULT_BLOCK_SIZE;
44  import static io.netty.handler.codec.compression.Lz4Constants.DEFAULT_SEED;
45  import static io.netty.handler.codec.compression.Lz4Constants.HEADER_LENGTH;
46  import static io.netty.handler.codec.compression.Lz4Constants.MAGIC_NUMBER;
47  import static io.netty.handler.codec.compression.Lz4Constants.MAX_BLOCK_SIZE;
48  import static io.netty.handler.codec.compression.Lz4Constants.MIN_BLOCK_SIZE;
49  import static io.netty.handler.codec.compression.Lz4Constants.TOKEN_OFFSET;
50  
51  /**
52   * Compresses a {@link ByteBuf} using the LZ4 format.
53   *
54   * See original <a href="https://github.com/Cyan4973/lz4">LZ4 Github project</a>
55   * and <a href="https://fastcompression.blogspot.ru/2011/05/lz4-explained.html">LZ4 block format</a>
56   * for full description.
57   *
58   * Since the original LZ4 block format does not contains size of compressed block and size of original data
59   * this encoder uses format like <a href="https://github.com/idelpivnitskiy/lz4-java">LZ4 Java</a> library
60   * written by Adrien Grand and approved by Yann Collet (author of original LZ4 library).
61   *
62   *  * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *     * * * * * * * * * *
63   *  * Magic * Token *  Compressed *  Decompressed *  Checksum *  +  *  LZ4 compressed *
64   *  *       *       *    length   *     length    *           *     *      block      *
65   *  * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *     * * * * * * * * * *
66   */
67  public class Lz4FrameEncoder extends MessageToByteEncoder<ByteBuf> {
68      static final int DEFAULT_MAX_ENCODE_SIZE = Integer.MAX_VALUE;
69  
70      private final int blockSize;
71  
72      /**
73       * Underlying compressor in use.
74       */
75      private final LZ4Compressor compressor;
76  
77      /**
78       * Underlying checksum calculator in use.
79       */
80      private final ByteBufChecksum checksum;
81  
82      /**
83       * Compression level of current LZ4 encoder (depends on {@link #blockSize}).
84       */
85      private final int compressionLevel;
86  
87      /**
88       * Inner byte buffer for outgoing data. It's capacity will be {@link #blockSize}.
89       */
90      private ByteBuf buffer;
91  
92      /**
93       * Maximum size for any buffer to write encoded (compressed) data into.
94       */
95      private final int maxEncodeSize;
96  
97      /**
98       * Indicates if the compressed stream has been finished.
99       */
100     private volatile boolean finished;
101 
102     /**
103      * Used to interact with its {@link ChannelPipeline} and other handlers.
104      */
105     private volatile ChannelHandlerContext ctx;
106 
107     /**
108      * Creates the fastest LZ4 encoder with default block size (64 KB)
109      * and xxhash hashing for Java, based on Yann Collet's work available at
110      * <a href="https://github.com/Cyan4973/xxHash">Github</a>.
111      */
112     public Lz4FrameEncoder() {
113         this(false);
114     }
115 
116     /**
117      * Creates a new LZ4 encoder with hight or fast compression, default block size (64 KB)
118      * and xxhash hashing for Java, based on Yann Collet's work available at
119      * <a href="https://github.com/Cyan4973/xxHash">Github</a>.
120      *
121      * @param highCompressor  if {@code true} codec will use compressor which requires more memory
122      *                        and is slower but compresses more efficiently
123      */
124     public Lz4FrameEncoder(boolean highCompressor) {
125         this(LZ4Factory.fastestInstance(), highCompressor, DEFAULT_BLOCK_SIZE, new Lz4XXHash32(DEFAULT_SEED));
126     }
127 
128     /**
129      * Creates a new customizable LZ4 encoder.
130      *
131      * @param factory         user customizable {@link LZ4Factory} instance
132      *                        which may be JNI bindings to the original C implementation, a pure Java implementation
133      *                        or a Java implementation that uses the {@link sun.misc.Unsafe}
134      * @param highCompressor  if {@code true} codec will use compressor which requires more memory
135      *                        and is slower but compresses more efficiently
136      * @param blockSize       the maximum number of bytes to try to compress at once,
137      *                        must be >= 64 and <= 32 M
138      * @param checksum        the {@link Checksum} instance to use to check data for integrity
139      */
140     public Lz4FrameEncoder(LZ4Factory factory, boolean highCompressor, int blockSize, Checksum checksum) {
141         this(factory, highCompressor, blockSize, checksum, DEFAULT_MAX_ENCODE_SIZE);
142     }
143 
144         /**
145          * Creates a new customizable LZ4 encoder.
146          *
147          * @param factory         user customizable {@link LZ4Factory} instance
148          *                        which may be JNI bindings to the original C implementation, a pure Java implementation
149          *                        or a Java implementation that uses the {@link sun.misc.Unsafe}
150          * @param highCompressor  if {@code true} codec will use compressor which requires more memory
151          *                        and is slower but compresses more efficiently
152          * @param blockSize       the maximum number of bytes to try to compress at once,
153          *                        must be >= 64 and <= 32 M
154          * @param checksum        the {@link Checksum} instance to use to check data for integrity
155          * @param maxEncodeSize   the maximum size for an encode (compressed) buffer
156          */
157     public Lz4FrameEncoder(LZ4Factory factory, boolean highCompressor, int blockSize,
158                            Checksum checksum, int maxEncodeSize) {
159         super(ByteBuf.class);
160         ObjectUtil.checkNotNull(factory, "factory");
161         ObjectUtil.checkNotNull(checksum, "checksum");
162 
163         compressor = highCompressor ? factory.highCompressor() : factory.fastCompressor();
164         this.checksum = ByteBufChecksum.wrapChecksum(checksum);
165 
166         compressionLevel = compressionLevel(blockSize);
167         this.blockSize = blockSize;
168         this.maxEncodeSize = ObjectUtil.checkPositive(maxEncodeSize, "maxEncodeSize");
169         finished = false;
170     }
171 
172     /**
173      * Calculates compression level on the basis of block size.
174      */
175     private static int compressionLevel(int blockSize) {
176         if (blockSize < MIN_BLOCK_SIZE || blockSize > MAX_BLOCK_SIZE) {
177             throw new IllegalArgumentException(String.format(
178                     "blockSize: %d (expected: %d-%d)", blockSize, MIN_BLOCK_SIZE, MAX_BLOCK_SIZE));
179         }
180         int compressionLevel = 32 - Integer.numberOfLeadingZeros(blockSize - 1); // ceil of log2
181         compressionLevel = Math.max(0, compressionLevel - COMPRESSION_LEVEL_BASE);
182         return compressionLevel;
183     }
184 
185     @Override
186     protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, ByteBuf msg, boolean preferDirect) {
187         return allocateBuffer(ctx, msg, preferDirect, true);
188     }
189 
190     private ByteBuf allocateBuffer(ChannelHandlerContext ctx, ByteBuf msg, boolean preferDirect,
191                                    boolean allowEmptyReturn) {
192         int targetBufSize = 0;
193         int remaining = msg.readableBytes() + buffer.readableBytes();
194 
195         // quick overflow check
196         if (remaining < 0) {
197             throw new EncoderException("too much data to allocate a buffer for compression");
198         }
199 
200         while (remaining > 0) {
201             int curSize = Math.min(blockSize, remaining);
202             remaining -= curSize;
203             // calculate the total compressed size of the current block (including header) and add to the total
204             targetBufSize += compressor.maxCompressedLength(curSize) + HEADER_LENGTH;
205         }
206 
207         // in addition to just the raw byte count, the headers (HEADER_LENGTH) per block (configured via
208         // #blockSize) will also add to the targetBufSize, and the combination of those would never wrap around
209         // again to be >= 0, this is a good check for the overflow case.
210         if (targetBufSize > maxEncodeSize || 0 > targetBufSize) {
211             throw new EncoderException(String.format("requested encode buffer size (%d bytes) exceeds the maximum " +
212                                                      "allowable size (%d bytes)", targetBufSize, maxEncodeSize));
213         }
214 
215         if (allowEmptyReturn && targetBufSize < blockSize) {
216             return Unpooled.EMPTY_BUFFER;
217         }
218 
219         if (preferDirect) {
220             return ctx.alloc().ioBuffer(targetBufSize, targetBufSize);
221         } else {
222             return ctx.alloc().heapBuffer(targetBufSize, targetBufSize);
223         }
224     }
225 
226     /**
227      * {@inheritDoc}
228      *
229      * Encodes the input buffer into {@link #blockSize} chunks in the output buffer. Data is only compressed and
230      * written once we hit the {@link #blockSize}; else, it is copied into the backing {@link #buffer} to await
231      * more data.
232      */
233     @Override
234     protected void encode(ChannelHandlerContext ctx, ByteBuf in, ByteBuf out) throws Exception {
235         if (finished) {
236             if (!out.isWritable(in.readableBytes())) {
237                 // out should be EMPTY_BUFFER because we should have allocated enough space above in allocateBuffer.
238                 throw new IllegalStateException("encode finished and not enough space to write remaining data");
239             }
240             out.writeBytes(in);
241             return;
242         }
243 
244         final ByteBuf buffer = this.buffer;
245         int length;
246         while ((length = in.readableBytes()) > 0) {
247             final int nextChunkSize = Math.min(length, buffer.writableBytes());
248             in.readBytes(buffer, nextChunkSize);
249 
250             if (!buffer.isWritable()) {
251                 flushBufferedData(out);
252             }
253         }
254     }
255 
256     private void flushBufferedData(ByteBuf out) {
257         int flushableBytes = buffer.readableBytes();
258         if (flushableBytes == 0) {
259             return;
260         }
261         checksum.reset();
262         checksum.update(buffer, buffer.readerIndex(), flushableBytes);
263         final int check = (int) checksum.getValue();
264 
265         final int bufSize = compressor.maxCompressedLength(flushableBytes) + HEADER_LENGTH;
266         out.ensureWritable(bufSize);
267         final int idx = out.writerIndex();
268         int compressedLength;
269         try {
270             ByteBuffer outNioBuffer = out.internalNioBuffer(idx + HEADER_LENGTH, out.writableBytes() - HEADER_LENGTH);
271             int pos = outNioBuffer.position();
272             // We always want to start at position 0 as we take care of reusing the buffer in the encode(...) loop.
273             compressor.compress(buffer.internalNioBuffer(buffer.readerIndex(), flushableBytes), outNioBuffer);
274             compressedLength = outNioBuffer.position() - pos;
275         } catch (LZ4Exception e) {
276             throw new CompressionException(e);
277         }
278         final int blockType;
279         if (compressedLength >= flushableBytes) {
280             blockType = BLOCK_TYPE_NON_COMPRESSED;
281             compressedLength = flushableBytes;
282             out.setBytes(idx + HEADER_LENGTH, buffer, buffer.readerIndex(), flushableBytes);
283         } else {
284             blockType = BLOCK_TYPE_COMPRESSED;
285         }
286 
287         out.setLong(idx, MAGIC_NUMBER);
288         out.setByte(idx + TOKEN_OFFSET, (byte) (blockType | compressionLevel));
289         out.setIntLE(idx + COMPRESSED_LENGTH_OFFSET, compressedLength);
290         out.setIntLE(idx + DECOMPRESSED_LENGTH_OFFSET, flushableBytes);
291         out.setIntLE(idx + CHECKSUM_OFFSET, check);
292         out.writerIndex(idx + HEADER_LENGTH + compressedLength);
293         buffer.clear();
294     }
295 
296     @Override
297     public void flush(final ChannelHandlerContext ctx) throws Exception {
298         if (buffer != null && buffer.isReadable()) {
299             final ByteBuf buf = allocateBuffer(ctx, Unpooled.EMPTY_BUFFER, isPreferDirect(), false);
300             flushBufferedData(buf);
301             ctx.write(buf);
302         }
303         ctx.flush();
304     }
305 
306     private ChannelFuture finishEncode(final ChannelHandlerContext ctx, ChannelPromise promise) {
307         if (finished) {
308             promise.setSuccess();
309             return promise;
310         }
311         finished = true;
312 
313         final ByteBuf footer = ctx.alloc().heapBuffer(
314                 compressor.maxCompressedLength(buffer.readableBytes()) + HEADER_LENGTH);
315         flushBufferedData(footer);
316 
317         footer.ensureWritable(HEADER_LENGTH);
318         final int idx = footer.writerIndex();
319         footer.setLong(idx, MAGIC_NUMBER);
320         footer.setByte(idx + TOKEN_OFFSET, (byte) (BLOCK_TYPE_NON_COMPRESSED | compressionLevel));
321         footer.setInt(idx + COMPRESSED_LENGTH_OFFSET, 0);
322         footer.setInt(idx + DECOMPRESSED_LENGTH_OFFSET, 0);
323         footer.setInt(idx + CHECKSUM_OFFSET, 0);
324 
325         footer.writerIndex(idx + HEADER_LENGTH);
326 
327         return ctx.writeAndFlush(footer, promise);
328     }
329 
330     /**
331      * Returns {@code true} if and only if the compressed stream has been finished.
332      */
333     public boolean isClosed() {
334         return finished;
335     }
336 
337     /**
338      * Close this {@link Lz4FrameEncoder} and so finish the encoding.
339      *
340      * The returned {@link ChannelFuture} will be notified once the operation completes.
341      */
342     public ChannelFuture close() {
343         return close(ctx().newPromise());
344     }
345 
346     /**
347      * Close this {@link Lz4FrameEncoder} and so finish the encoding.
348      * The given {@link ChannelFuture} will be notified once the operation
349      * completes and will also be returned.
350      */
351     public ChannelFuture close(final ChannelPromise promise) {
352         ChannelHandlerContext ctx = ctx();
353         EventExecutor executor = ctx.executor();
354         if (executor.inEventLoop()) {
355             return finishEncode(ctx, promise);
356         } else {
357             executor.execute(new Runnable() {
358                 @Override
359                 public void run() {
360                     ChannelFuture f = finishEncode(ctx(), promise);
361                     PromiseNotifier.cascade(f, promise);
362                 }
363             });
364             return promise;
365         }
366     }
367 
368     @Override
369     public void close(final ChannelHandlerContext ctx, final ChannelPromise promise) throws Exception {
370         ChannelFuture f = finishEncode(ctx, ctx.newPromise());
371 
372         EncoderUtil.closeAfterFinishEncode(ctx, f, promise);
373     }
374 
375     private ChannelHandlerContext ctx() {
376         ChannelHandlerContext ctx = this.ctx;
377         if (ctx == null) {
378             throw new IllegalStateException("not added to a pipeline");
379         }
380         return ctx;
381     }
382 
383     @Override
384     public void handlerAdded(ChannelHandlerContext ctx) {
385         this.ctx = ctx;
386         // Ensure we use a heap based ByteBuf.
387         buffer = Unpooled.wrappedBuffer(new byte[blockSize]);
388         buffer.clear();
389     }
390 
391     @Override
392     public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
393         super.handlerRemoved(ctx);
394         if (buffer != null) {
395             buffer.release();
396             buffer = null;
397         }
398     }
399 
400     final ByteBuf getBackingBuffer() {
401         return buffer;
402     }
403 }