View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  
17  package io.netty.handler.codec.compression;
18  
19  import io.netty.buffer.ByteBuf;
20  import io.netty.buffer.Unpooled;
21  import io.netty.channel.ChannelFuture;
22  import io.netty.channel.ChannelFutureListener;
23  import io.netty.channel.ChannelHandlerContext;
24  import io.netty.channel.ChannelPipeline;
25  import io.netty.channel.ChannelPromise;
26  import io.netty.handler.codec.EncoderException;
27  import io.netty.handler.codec.MessageToByteEncoder;
28  import io.netty.util.concurrent.EventExecutor;
29  import io.netty.util.concurrent.PromiseNotifier;
30  import io.netty.util.internal.ObjectUtil;
31  import net.jpountz.lz4.LZ4Compressor;
32  import net.jpountz.lz4.LZ4Exception;
33  import net.jpountz.lz4.LZ4Factory;
34  
35  import java.nio.ByteBuffer;
36  import java.util.concurrent.TimeUnit;
37  import java.util.zip.Checksum;
38  
39  import static io.netty.handler.codec.compression.Lz4Constants.BLOCK_TYPE_COMPRESSED;
40  import static io.netty.handler.codec.compression.Lz4Constants.BLOCK_TYPE_NON_COMPRESSED;
41  import static io.netty.handler.codec.compression.Lz4Constants.CHECKSUM_OFFSET;
42  import static io.netty.handler.codec.compression.Lz4Constants.COMPRESSED_LENGTH_OFFSET;
43  import static io.netty.handler.codec.compression.Lz4Constants.COMPRESSION_LEVEL_BASE;
44  import static io.netty.handler.codec.compression.Lz4Constants.DECOMPRESSED_LENGTH_OFFSET;
45  import static io.netty.handler.codec.compression.Lz4Constants.DEFAULT_BLOCK_SIZE;
46  import static io.netty.handler.codec.compression.Lz4Constants.DEFAULT_SEED;
47  import static io.netty.handler.codec.compression.Lz4Constants.HEADER_LENGTH;
48  import static io.netty.handler.codec.compression.Lz4Constants.MAGIC_NUMBER;
49  import static io.netty.handler.codec.compression.Lz4Constants.MAX_BLOCK_SIZE;
50  import static io.netty.handler.codec.compression.Lz4Constants.MIN_BLOCK_SIZE;
51  import static io.netty.handler.codec.compression.Lz4Constants.TOKEN_OFFSET;
52  import static io.netty.handler.codec.compression.Lz4Constants.THREAD_POOL_DELAY_SECONDS;
53  
54  /**
55   * Compresses a {@link ByteBuf} using the LZ4 format.
56   *
57   * See original <a href="https://github.com/Cyan4973/lz4">LZ4 Github project</a>
58   * and <a href="https://fastcompression.blogspot.ru/2011/05/lz4-explained.html">LZ4 block format</a>
59   * for full description.
60   *
61   * Since the original LZ4 block format does not contains size of compressed block and size of original data
62   * this encoder uses format like <a href="https://github.com/idelpivnitskiy/lz4-java">LZ4 Java</a> library
63   * written by Adrien Grand and approved by Yann Collet (author of original LZ4 library).
64   *
65   *  * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *     * * * * * * * * * *
66   *  * Magic * Token *  Compressed *  Decompressed *  Checksum *  +  *  LZ4 compressed *
67   *  *       *       *    length   *     length    *           *     *      block      *
68   *  * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *     * * * * * * * * * *
69   */
70  public class Lz4FrameEncoder extends MessageToByteEncoder<ByteBuf> {
71      static final int DEFAULT_MAX_ENCODE_SIZE = Integer.MAX_VALUE;
72  
73      private final int blockSize;
74  
75      /**
76       * Underlying compressor in use.
77       */
78      private final LZ4Compressor compressor;
79  
80      /**
81       * Underlying checksum calculator in use.
82       */
83      private final ByteBufChecksum checksum;
84  
85      /**
86       * Compression level of current LZ4 encoder (depends on {@link #blockSize}).
87       */
88      private final int compressionLevel;
89  
90      /**
91       * Inner byte buffer for outgoing data. It's capacity will be {@link #blockSize}.
92       */
93      private ByteBuf buffer;
94  
95      /**
96       * Maximum size for any buffer to write encoded (compressed) data into.
97       */
98      private final int maxEncodeSize;
99  
100     /**
101      * Indicates if the compressed stream has been finished.
102      */
103     private volatile boolean finished;
104 
105     /**
106      * Used to interact with its {@link ChannelPipeline} and other handlers.
107      */
108     private volatile ChannelHandlerContext ctx;
109 
110     /**
111      * Creates the fastest LZ4 encoder with default block size (64 KB)
112      * and xxhash hashing for Java, based on Yann Collet's work available at
113      * <a href="https://github.com/Cyan4973/xxHash">Github</a>.
114      */
115     public Lz4FrameEncoder() {
116         this(false);
117     }
118 
119     /**
120      * Creates a new LZ4 encoder with hight or fast compression, default block size (64 KB)
121      * and xxhash hashing for Java, based on Yann Collet's work available at
122      * <a href="https://github.com/Cyan4973/xxHash">Github</a>.
123      *
124      * @param highCompressor  if {@code true} codec will use compressor which requires more memory
125      *                        and is slower but compresses more efficiently
126      */
127     public Lz4FrameEncoder(boolean highCompressor) {
128         this(LZ4Factory.fastestInstance(), highCompressor, DEFAULT_BLOCK_SIZE, new Lz4XXHash32(DEFAULT_SEED));
129     }
130 
131     /**
132      * Creates a new customizable LZ4 encoder.
133      *
134      * @param factory         user customizable {@link LZ4Factory} instance
135      *                        which may be JNI bindings to the original C implementation, a pure Java implementation
136      *                        or a Java implementation that uses the {@link sun.misc.Unsafe}
137      * @param highCompressor  if {@code true} codec will use compressor which requires more memory
138      *                        and is slower but compresses more efficiently
139      * @param blockSize       the maximum number of bytes to try to compress at once,
140      *                        must be >= 64 and <= 32 M
141      * @param checksum        the {@link Checksum} instance to use to check data for integrity
142      */
143     public Lz4FrameEncoder(LZ4Factory factory, boolean highCompressor, int blockSize, Checksum checksum) {
144         this(factory, highCompressor, blockSize, checksum, DEFAULT_MAX_ENCODE_SIZE);
145     }
146 
147         /**
148          * Creates a new customizable LZ4 encoder.
149          *
150          * @param factory         user customizable {@link LZ4Factory} instance
151          *                        which may be JNI bindings to the original C implementation, a pure Java implementation
152          *                        or a Java implementation that uses the {@link sun.misc.Unsafe}
153          * @param highCompressor  if {@code true} codec will use compressor which requires more memory
154          *                        and is slower but compresses more efficiently
155          * @param blockSize       the maximum number of bytes to try to compress at once,
156          *                        must be >= 64 and <= 32 M
157          * @param checksum        the {@link Checksum} instance to use to check data for integrity
158          * @param maxEncodeSize   the maximum size for an encode (compressed) buffer
159          */
160     public Lz4FrameEncoder(LZ4Factory factory, boolean highCompressor, int blockSize,
161                            Checksum checksum, int maxEncodeSize) {
162         ObjectUtil.checkNotNull(factory, "factory");
163         ObjectUtil.checkNotNull(checksum, "checksum");
164 
165         compressor = highCompressor ? factory.highCompressor() : factory.fastCompressor();
166         this.checksum = ByteBufChecksum.wrapChecksum(checksum);
167 
168         compressionLevel = compressionLevel(blockSize);
169         this.blockSize = blockSize;
170         this.maxEncodeSize = ObjectUtil.checkPositive(maxEncodeSize, "maxEncodeSize");
171         finished = false;
172     }
173 
174     /**
175      * Calculates compression level on the basis of block size.
176      */
177     private static int compressionLevel(int blockSize) {
178         if (blockSize < MIN_BLOCK_SIZE || blockSize > MAX_BLOCK_SIZE) {
179             throw new IllegalArgumentException(String.format(
180                     "blockSize: %d (expected: %d-%d)", blockSize, MIN_BLOCK_SIZE, MAX_BLOCK_SIZE));
181         }
182         int compressionLevel = 32 - Integer.numberOfLeadingZeros(blockSize - 1); // ceil of log2
183         compressionLevel = Math.max(0, compressionLevel - COMPRESSION_LEVEL_BASE);
184         return compressionLevel;
185     }
186 
187     @Override
188     protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, ByteBuf msg, boolean preferDirect) {
189         return allocateBuffer(ctx, msg, preferDirect, true);
190     }
191 
192     private ByteBuf allocateBuffer(ChannelHandlerContext ctx, ByteBuf msg, boolean preferDirect,
193                                    boolean allowEmptyReturn) {
194         int targetBufSize = 0;
195         int remaining = msg.readableBytes() + buffer.readableBytes();
196 
197         // quick overflow check
198         if (remaining < 0) {
199             throw new EncoderException("too much data to allocate a buffer for compression");
200         }
201 
202         while (remaining > 0) {
203             int curSize = Math.min(blockSize, remaining);
204             remaining -= curSize;
205             // calculate the total compressed size of the current block (including header) and add to the total
206             targetBufSize += compressor.maxCompressedLength(curSize) + HEADER_LENGTH;
207         }
208 
209         // in addition to just the raw byte count, the headers (HEADER_LENGTH) per block (configured via
210         // #blockSize) will also add to the targetBufSize, and the combination of those would never wrap around
211         // again to be >= 0, this is a good check for the overflow case.
212         if (targetBufSize > maxEncodeSize || 0 > targetBufSize) {
213             throw new EncoderException(String.format("requested encode buffer size (%d bytes) exceeds the maximum " +
214                                                      "allowable size (%d bytes)", targetBufSize, maxEncodeSize));
215         }
216 
217         if (allowEmptyReturn && targetBufSize < blockSize) {
218             return Unpooled.EMPTY_BUFFER;
219         }
220 
221         if (preferDirect) {
222             return ctx.alloc().ioBuffer(targetBufSize, targetBufSize);
223         } else {
224             return ctx.alloc().heapBuffer(targetBufSize, targetBufSize);
225         }
226     }
227 
228     /**
229      * {@inheritDoc}
230      *
231      * Encodes the input buffer into {@link #blockSize} chunks in the output buffer. Data is only compressed and
232      * written once we hit the {@link #blockSize}; else, it is copied into the backing {@link #buffer} to await
233      * more data.
234      */
235     @Override
236     protected void encode(ChannelHandlerContext ctx, ByteBuf in, ByteBuf out) throws Exception {
237         if (finished) {
238             if (!out.isWritable(in.readableBytes())) {
239                 // out should be EMPTY_BUFFER because we should have allocated enough space above in allocateBuffer.
240                 throw new IllegalStateException("encode finished and not enough space to write remaining data");
241             }
242             out.writeBytes(in);
243             return;
244         }
245 
246         final ByteBuf buffer = this.buffer;
247         int length;
248         while ((length = in.readableBytes()) > 0) {
249             final int nextChunkSize = Math.min(length, buffer.writableBytes());
250             in.readBytes(buffer, nextChunkSize);
251 
252             if (!buffer.isWritable()) {
253                 flushBufferedData(out);
254             }
255         }
256     }
257 
258     private void flushBufferedData(ByteBuf out) {
259         int flushableBytes = buffer.readableBytes();
260         if (flushableBytes == 0) {
261             return;
262         }
263         checksum.reset();
264         checksum.update(buffer, buffer.readerIndex(), flushableBytes);
265         final int check = (int) checksum.getValue();
266 
267         final int bufSize = compressor.maxCompressedLength(flushableBytes) + HEADER_LENGTH;
268         out.ensureWritable(bufSize);
269         final int idx = out.writerIndex();
270         int compressedLength;
271         try {
272             ByteBuffer outNioBuffer = out.internalNioBuffer(idx + HEADER_LENGTH, out.writableBytes() - HEADER_LENGTH);
273             int pos = outNioBuffer.position();
274             // We always want to start at position 0 as we take care of reusing the buffer in the encode(...) loop.
275             compressor.compress(buffer.internalNioBuffer(buffer.readerIndex(), flushableBytes), outNioBuffer);
276             compressedLength = outNioBuffer.position() - pos;
277         } catch (LZ4Exception e) {
278             throw new CompressionException(e);
279         }
280         final int blockType;
281         if (compressedLength >= flushableBytes) {
282             blockType = BLOCK_TYPE_NON_COMPRESSED;
283             compressedLength = flushableBytes;
284             out.setBytes(idx + HEADER_LENGTH, buffer, buffer.readerIndex(), flushableBytes);
285         } else {
286             blockType = BLOCK_TYPE_COMPRESSED;
287         }
288 
289         out.setLong(idx, MAGIC_NUMBER);
290         out.setByte(idx + TOKEN_OFFSET, (byte) (blockType | compressionLevel));
291         out.setIntLE(idx + COMPRESSED_LENGTH_OFFSET, compressedLength);
292         out.setIntLE(idx + DECOMPRESSED_LENGTH_OFFSET, flushableBytes);
293         out.setIntLE(idx + CHECKSUM_OFFSET, check);
294         out.writerIndex(idx + HEADER_LENGTH + compressedLength);
295         buffer.clear();
296     }
297 
298     @Override
299     public void flush(final ChannelHandlerContext ctx) throws Exception {
300         if (buffer != null && buffer.isReadable()) {
301             final ByteBuf buf = allocateBuffer(ctx, Unpooled.EMPTY_BUFFER, isPreferDirect(), false);
302             flushBufferedData(buf);
303             ctx.write(buf);
304         }
305         ctx.flush();
306     }
307 
308     private ChannelFuture finishEncode(final ChannelHandlerContext ctx, ChannelPromise promise) {
309         if (finished) {
310             promise.setSuccess();
311             return promise;
312         }
313         finished = true;
314 
315         final ByteBuf footer = ctx.alloc().heapBuffer(
316                 compressor.maxCompressedLength(buffer.readableBytes()) + HEADER_LENGTH);
317         flushBufferedData(footer);
318 
319         footer.ensureWritable(HEADER_LENGTH);
320         final int idx = footer.writerIndex();
321         footer.setLong(idx, MAGIC_NUMBER);
322         footer.setByte(idx + TOKEN_OFFSET, (byte) (BLOCK_TYPE_NON_COMPRESSED | compressionLevel));
323         footer.setInt(idx + COMPRESSED_LENGTH_OFFSET, 0);
324         footer.setInt(idx + DECOMPRESSED_LENGTH_OFFSET, 0);
325         footer.setInt(idx + CHECKSUM_OFFSET, 0);
326 
327         footer.writerIndex(idx + HEADER_LENGTH);
328 
329         return ctx.writeAndFlush(footer, promise);
330     }
331 
332     /**
333      * Returns {@code true} if and only if the compressed stream has been finished.
334      */
335     public boolean isClosed() {
336         return finished;
337     }
338 
339     /**
340      * Close this {@link Lz4FrameEncoder} and so finish the encoding.
341      *
342      * The returned {@link ChannelFuture} will be notified once the operation completes.
343      */
344     public ChannelFuture close() {
345         return close(ctx().newPromise());
346     }
347 
348     /**
349      * Close this {@link Lz4FrameEncoder} and so finish the encoding.
350      * The given {@link ChannelFuture} will be notified once the operation
351      * completes and will also be returned.
352      */
353     public ChannelFuture close(final ChannelPromise promise) {
354         ChannelHandlerContext ctx = ctx();
355         EventExecutor executor = ctx.executor();
356         if (executor.inEventLoop()) {
357             return finishEncode(ctx, promise);
358         } else {
359             executor.execute(new Runnable() {
360                 @Override
361                 public void run() {
362                     ChannelFuture f = finishEncode(ctx(), promise);
363                     PromiseNotifier.cascade(f, promise);
364                 }
365             });
366             return promise;
367         }
368     }
369 
370     @Override
371     public void close(final ChannelHandlerContext ctx, final ChannelPromise promise) throws Exception {
372         ChannelFuture f = finishEncode(ctx, ctx.newPromise());
373         f.addListener(new ChannelFutureListener() {
374             @Override
375             public void operationComplete(ChannelFuture f) throws Exception {
376                 ctx.close(promise);
377             }
378         });
379 
380         if (!f.isDone()) {
381             // Ensure the channel is closed even if the write operation completes in time.
382             ctx.executor().schedule(new Runnable() {
383                 @Override
384                 public void run() {
385                     ctx.close(promise);
386                 }
387             }, THREAD_POOL_DELAY_SECONDS, TimeUnit.SECONDS);
388         }
389     }
390 
391     private ChannelHandlerContext ctx() {
392         ChannelHandlerContext ctx = this.ctx;
393         if (ctx == null) {
394             throw new IllegalStateException("not added to a pipeline");
395         }
396         return ctx;
397     }
398 
399     @Override
400     public void handlerAdded(ChannelHandlerContext ctx) {
401         this.ctx = ctx;
402         // Ensure we use a heap based ByteBuf.
403         buffer = Unpooled.wrappedBuffer(new byte[blockSize]);
404         buffer.clear();
405     }
406 
407     @Override
408     public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
409         super.handlerRemoved(ctx);
410         if (buffer != null) {
411             buffer.release();
412             buffer = null;
413         }
414     }
415 
416     final ByteBuf getBackingBuffer() {
417         return buffer;
418     }
419 }