View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  
17  package io.netty.handler.codec.compression;
18  
19  import io.netty.buffer.ByteBuf;
20  import io.netty.buffer.Unpooled;
21  import io.netty.channel.ChannelFuture;
22  import io.netty.channel.ChannelFutureListener;
23  import io.netty.channel.ChannelHandlerContext;
24  import io.netty.channel.ChannelPipeline;
25  import io.netty.channel.ChannelPromise;
26  import io.netty.channel.ChannelPromiseNotifier;
27  import io.netty.handler.codec.EncoderException;
28  import io.netty.handler.codec.MessageToByteEncoder;
29  import io.netty.util.concurrent.EventExecutor;
30  import io.netty.util.internal.ObjectUtil;
31  import net.jpountz.lz4.LZ4Compressor;
32  import net.jpountz.lz4.LZ4Exception;
33  import net.jpountz.lz4.LZ4Factory;
34  import net.jpountz.xxhash.XXHashFactory;
35  
36  import java.nio.ByteBuffer;
37  import java.util.concurrent.TimeUnit;
38  import java.util.zip.Checksum;
39  
40  import static io.netty.handler.codec.compression.Lz4Constants.BLOCK_TYPE_COMPRESSED;
41  import static io.netty.handler.codec.compression.Lz4Constants.BLOCK_TYPE_NON_COMPRESSED;
42  import static io.netty.handler.codec.compression.Lz4Constants.CHECKSUM_OFFSET;
43  import static io.netty.handler.codec.compression.Lz4Constants.COMPRESSED_LENGTH_OFFSET;
44  import static io.netty.handler.codec.compression.Lz4Constants.COMPRESSION_LEVEL_BASE;
45  import static io.netty.handler.codec.compression.Lz4Constants.DECOMPRESSED_LENGTH_OFFSET;
46  import static io.netty.handler.codec.compression.Lz4Constants.DEFAULT_BLOCK_SIZE;
47  import static io.netty.handler.codec.compression.Lz4Constants.DEFAULT_SEED;
48  import static io.netty.handler.codec.compression.Lz4Constants.HEADER_LENGTH;
49  import static io.netty.handler.codec.compression.Lz4Constants.MAGIC_NUMBER;
50  import static io.netty.handler.codec.compression.Lz4Constants.MAX_BLOCK_SIZE;
51  import static io.netty.handler.codec.compression.Lz4Constants.MIN_BLOCK_SIZE;
52  import static io.netty.handler.codec.compression.Lz4Constants.TOKEN_OFFSET;
53  import static io.netty.util.internal.ThrowableUtil.unknownStackTrace;
54  
55  /**
56   * Compresses a {@link ByteBuf} using the LZ4 format.
57   *
58   * See original <a href="https://github.com/Cyan4973/lz4">LZ4 Github project</a>
59   * and <a href="http://fastcompression.blogspot.ru/2011/05/lz4-explained.html">LZ4 block format</a>
60   * for full description.
61   *
62   * Since the original LZ4 block format does not contains size of compressed block and size of original data
63   * this encoder uses format like <a href="https://github.com/idelpivnitskiy/lz4-java">LZ4 Java</a> library
64   * written by Adrien Grand and approved by Yann Collet (author of original LZ4 library).
65   *
66   *  * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *     * * * * * * * * * *
67   *  * Magic * Token *  Compressed *  Decompressed *  Checksum *  +  *  LZ4 compressed *
68   *  *       *       *    length   *     length    *           *     *      block      *
69   *  * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *     * * * * * * * * * *
70   */
71  public class Lz4FrameEncoder extends MessageToByteEncoder<ByteBuf> {
72      private static final EncoderException ENCODE_FINSHED_EXCEPTION = unknownStackTrace(new EncoderException(
73                      new IllegalStateException("encode finished and not enough space to write remaining data")),
74                      Lz4FrameEncoder.class, "encode");
75      static final int DEFAULT_MAX_ENCODE_SIZE = Integer.MAX_VALUE;
76  
77      private final int blockSize;
78  
79      /**
80       * Underlying compressor in use.
81       */
82      private final LZ4Compressor compressor;
83  
84      /**
85       * Underlying checksum calculator in use.
86       */
87      private final ByteBufChecksum checksum;
88  
89      /**
90       * Compression level of current LZ4 encoder (depends on {@link #blockSize}).
91       */
92      private final int compressionLevel;
93  
94      /**
95       * Inner byte buffer for outgoing data. It's capacity will be {@link #blockSize}.
96       */
97      private ByteBuf buffer;
98  
99      /**
100      * Maximum size for any buffer to write encoded (compressed) data into.
101      */
102     private final int maxEncodeSize;
103 
104     /**
105      * Indicates if the compressed stream has been finished.
106      */
107     private volatile boolean finished;
108 
109     /**
110      * Used to interact with its {@link ChannelPipeline} and other handlers.
111      */
112     private volatile ChannelHandlerContext ctx;
113 
114     /**
115      * Creates the fastest LZ4 encoder with default block size (64 KB)
116      * and xxhash hashing for Java, based on Yann Collet's work available at
117      * <a href="https://github.com/Cyan4973/xxHash">Github</a>.
118      */
119     public Lz4FrameEncoder() {
120         this(false);
121     }
122 
123     /**
124      * Creates a new LZ4 encoder with hight or fast compression, default block size (64 KB)
125      * and xxhash hashing for Java, based on Yann Collet's work available at
126      * <a href="https://github.com/Cyan4973/xxHash">Github</a>.
127      *
128      * @param highCompressor  if {@code true} codec will use compressor which requires more memory
129      *                        and is slower but compresses more efficiently
130      */
131     public Lz4FrameEncoder(boolean highCompressor) {
132         this(LZ4Factory.fastestInstance(), highCompressor, DEFAULT_BLOCK_SIZE,
133                 XXHashFactory.fastestInstance().newStreamingHash32(DEFAULT_SEED).asChecksum());
134     }
135 
136     /**
137      * Creates a new customizable LZ4 encoder.
138      *
139      * @param factory         user customizable {@link LZ4Factory} instance
140      *                        which may be JNI bindings to the original C implementation, a pure Java implementation
141      *                        or a Java implementation that uses the {@link sun.misc.Unsafe}
142      * @param highCompressor  if {@code true} codec will use compressor which requires more memory
143      *                        and is slower but compresses more efficiently
144      * @param blockSize       the maximum number of bytes to try to compress at once,
145      *                        must be >= 64 and <= 32 M
146      * @param checksum        the {@link Checksum} instance to use to check data for integrity
147      */
148     public Lz4FrameEncoder(LZ4Factory factory, boolean highCompressor, int blockSize, Checksum checksum) {
149         this(factory, highCompressor, blockSize, checksum, DEFAULT_MAX_ENCODE_SIZE);
150     }
151 
152         /**
153          * Creates a new customizable LZ4 encoder.
154          *
155          * @param factory         user customizable {@link LZ4Factory} instance
156          *                        which may be JNI bindings to the original C implementation, a pure Java implementation
157          *                        or a Java implementation that uses the {@link sun.misc.Unsafe}
158          * @param highCompressor  if {@code true} codec will use compressor which requires more memory
159          *                        and is slower but compresses more efficiently
160          * @param blockSize       the maximum number of bytes to try to compress at once,
161          *                        must be >= 64 and <= 32 M
162          * @param checksum        the {@link Checksum} instance to use to check data for integrity
163          * @param maxEncodeSize   the maximum size for an encode (compressed) buffer
164          */
165     public Lz4FrameEncoder(LZ4Factory factory, boolean highCompressor, int blockSize,
166                            Checksum checksum, int maxEncodeSize) {
167         if (factory == null) {
168             throw new NullPointerException("factory");
169         }
170         if (checksum == null) {
171             throw new NullPointerException("checksum");
172         }
173 
174         compressor = highCompressor ? factory.highCompressor() : factory.fastCompressor();
175         this.checksum = ByteBufChecksum.wrapChecksum(checksum);
176 
177         compressionLevel = compressionLevel(blockSize);
178         this.blockSize = blockSize;
179         this.maxEncodeSize = ObjectUtil.checkPositive(maxEncodeSize, "maxEncodeSize");
180         finished = false;
181     }
182 
183     /**
184      * Calculates compression level on the basis of block size.
185      */
186     private static int compressionLevel(int blockSize) {
187         if (blockSize < MIN_BLOCK_SIZE || blockSize > MAX_BLOCK_SIZE) {
188             throw new IllegalArgumentException(String.format(
189                     "blockSize: %d (expected: %d-%d)", blockSize, MIN_BLOCK_SIZE, MAX_BLOCK_SIZE));
190         }
191         int compressionLevel = 32 - Integer.numberOfLeadingZeros(blockSize - 1); // ceil of log2
192         compressionLevel = Math.max(0, compressionLevel - COMPRESSION_LEVEL_BASE);
193         return compressionLevel;
194     }
195 
196     @Override
197     protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, ByteBuf msg, boolean preferDirect) {
198         return allocateBuffer(ctx, msg, preferDirect, true);
199     }
200 
201     private ByteBuf allocateBuffer(ChannelHandlerContext ctx, ByteBuf msg, boolean preferDirect,
202                                    boolean allowEmptyReturn) {
203         int targetBufSize = 0;
204         int remaining = msg.readableBytes() + buffer.readableBytes();
205 
206         // quick overflow check
207         if (remaining < 0) {
208             throw new EncoderException("too much data to allocate a buffer for compression");
209         }
210 
211         while (remaining > 0) {
212             int curSize = Math.min(blockSize, remaining);
213             remaining -= curSize;
214             // calculate the total compressed size of the current block (including header) and add to the total
215             targetBufSize += compressor.maxCompressedLength(curSize) + HEADER_LENGTH;
216         }
217 
218         // in addition to just the raw byte count, the headers (HEADER_LENGTH) per block (configured via
219         // #blockSize) will also add to the targetBufSize, and the combination of those would never wrap around
220         // again to be >= 0, this is a good check for the overflow case.
221         if (targetBufSize > maxEncodeSize || 0 > targetBufSize) {
222             throw new EncoderException(String.format("requested encode buffer size (%d bytes) exceeds the maximum " +
223                                                      "allowable size (%d bytes)", targetBufSize, maxEncodeSize));
224         }
225 
226         if (allowEmptyReturn && targetBufSize < blockSize) {
227             return Unpooled.EMPTY_BUFFER;
228         }
229 
230         if (preferDirect) {
231             return ctx.alloc().ioBuffer(targetBufSize, targetBufSize);
232         } else {
233             return ctx.alloc().heapBuffer(targetBufSize, targetBufSize);
234         }
235     }
236 
237     /**
238      * {@inheritDoc}
239      *
240      * Encodes the input buffer into {@link #blockSize} chunks in the output buffer. Data is only compressed and
241      * written once we hit the {@link #blockSize}; else, it is copied into the backing {@link #buffer} to await
242      * more data.
243      */
244     @Override
245     protected void encode(ChannelHandlerContext ctx, ByteBuf in, ByteBuf out) throws Exception {
246         if (finished) {
247             if (!out.isWritable(in.readableBytes())) {
248                 // out should be EMPTY_BUFFER because we should have allocated enough space above in allocateBuffer.
249                 throw ENCODE_FINSHED_EXCEPTION;
250             }
251             out.writeBytes(in);
252             return;
253         }
254 
255         final ByteBuf buffer = this.buffer;
256         int length;
257         while ((length = in.readableBytes()) > 0) {
258             final int nextChunkSize = Math.min(length, buffer.writableBytes());
259             in.readBytes(buffer, nextChunkSize);
260 
261             if (!buffer.isWritable()) {
262                 flushBufferedData(out);
263             }
264         }
265     }
266 
267     private void flushBufferedData(ByteBuf out) {
268         int flushableBytes = buffer.readableBytes();
269         if (flushableBytes == 0) {
270             return;
271         }
272         checksum.reset();
273         checksum.update(buffer, buffer.readerIndex(), flushableBytes);
274         final int check = (int) checksum.getValue();
275 
276         final int bufSize = compressor.maxCompressedLength(flushableBytes) + HEADER_LENGTH;
277         out.ensureWritable(bufSize);
278         final int idx = out.writerIndex();
279         int compressedLength;
280         try {
281             ByteBuffer outNioBuffer = out.internalNioBuffer(idx + HEADER_LENGTH, out.writableBytes() - HEADER_LENGTH);
282             int pos = outNioBuffer.position();
283             // We always want to start at position 0 as we take care of reusing the buffer in the encode(...) loop.
284             compressor.compress(buffer.internalNioBuffer(buffer.readerIndex(), flushableBytes), outNioBuffer);
285             compressedLength = outNioBuffer.position() - pos;
286         } catch (LZ4Exception e) {
287             throw new CompressionException(e);
288         }
289         final int blockType;
290         if (compressedLength >= flushableBytes) {
291             blockType = BLOCK_TYPE_NON_COMPRESSED;
292             compressedLength = flushableBytes;
293             out.setBytes(idx + HEADER_LENGTH, buffer, 0, flushableBytes);
294         } else {
295             blockType = BLOCK_TYPE_COMPRESSED;
296         }
297 
298         out.setLong(idx, MAGIC_NUMBER);
299         out.setByte(idx + TOKEN_OFFSET, (byte) (blockType | compressionLevel));
300         out.setIntLE(idx + COMPRESSED_LENGTH_OFFSET, compressedLength);
301         out.setIntLE(idx + DECOMPRESSED_LENGTH_OFFSET, flushableBytes);
302         out.setIntLE(idx + CHECKSUM_OFFSET, check);
303         out.writerIndex(idx + HEADER_LENGTH + compressedLength);
304         buffer.clear();
305     }
306 
307     @Override
308     public void flush(final ChannelHandlerContext ctx) throws Exception {
309         if (buffer != null && buffer.isReadable()) {
310             final ByteBuf buf = allocateBuffer(ctx, Unpooled.EMPTY_BUFFER, isPreferDirect(), false);
311             flushBufferedData(buf);
312             ctx.write(buf);
313         }
314         ctx.flush();
315     }
316 
317     private ChannelFuture finishEncode(final ChannelHandlerContext ctx, ChannelPromise promise) {
318         if (finished) {
319             promise.setSuccess();
320             return promise;
321         }
322         finished = true;
323 
324         final ByteBuf footer = ctx.alloc().heapBuffer(
325                 compressor.maxCompressedLength(buffer.readableBytes()) + HEADER_LENGTH);
326         flushBufferedData(footer);
327 
328         final int idx = footer.writerIndex();
329         footer.setLong(idx, MAGIC_NUMBER);
330         footer.setByte(idx + TOKEN_OFFSET, (byte) (BLOCK_TYPE_NON_COMPRESSED | compressionLevel));
331         footer.setInt(idx + COMPRESSED_LENGTH_OFFSET, 0);
332         footer.setInt(idx + DECOMPRESSED_LENGTH_OFFSET, 0);
333         footer.setInt(idx + CHECKSUM_OFFSET, 0);
334 
335         footer.writerIndex(idx + HEADER_LENGTH);
336 
337         return ctx.writeAndFlush(footer, promise);
338     }
339 
340     /**
341      * Returns {@code true} if and only if the compressed stream has been finished.
342      */
343     public boolean isClosed() {
344         return finished;
345     }
346 
347     /**
348      * Close this {@link Lz4FrameEncoder} and so finish the encoding.
349      *
350      * The returned {@link ChannelFuture} will be notified once the operation completes.
351      */
352     public ChannelFuture close() {
353         return close(ctx().newPromise());
354     }
355 
356     /**
357      * Close this {@link Lz4FrameEncoder} and so finish the encoding.
358      * The given {@link ChannelFuture} will be notified once the operation
359      * completes and will also be returned.
360      */
361     public ChannelFuture close(final ChannelPromise promise) {
362         ChannelHandlerContext ctx = ctx();
363         EventExecutor executor = ctx.executor();
364         if (executor.inEventLoop()) {
365             return finishEncode(ctx, promise);
366         } else {
367             executor.execute(new Runnable() {
368                 @Override
369                 public void run() {
370                     ChannelFuture f = finishEncode(ctx(), promise);
371                     f.addListener(new ChannelPromiseNotifier(promise));
372                 }
373             });
374             return promise;
375         }
376     }
377 
378     @Override
379     public void close(final ChannelHandlerContext ctx, final ChannelPromise promise) throws Exception {
380         ChannelFuture f = finishEncode(ctx, ctx.newPromise());
381         f.addListener(new ChannelFutureListener() {
382             @Override
383             public void operationComplete(ChannelFuture f) throws Exception {
384                 ctx.close(promise);
385             }
386         });
387 
388         if (!f.isDone()) {
389             // Ensure the channel is closed even if the write operation completes in time.
390             ctx.executor().schedule(new Runnable() {
391                 @Override
392                 public void run() {
393                     ctx.close(promise);
394                 }
395             }, 10, TimeUnit.SECONDS); // FIXME: Magic number
396         }
397     }
398 
399     private ChannelHandlerContext ctx() {
400         ChannelHandlerContext ctx = this.ctx;
401         if (ctx == null) {
402             throw new IllegalStateException("not added to a pipeline");
403         }
404         return ctx;
405     }
406 
407     @Override
408     public void handlerAdded(ChannelHandlerContext ctx) {
409         this.ctx = ctx;
410         // Ensure we use a heap based ByteBuf.
411         buffer = Unpooled.wrappedBuffer(new byte[blockSize]);
412         buffer.clear();
413     }
414 
415     @Override
416     public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
417         super.handlerRemoved(ctx);
418         if (buffer != null) {
419             buffer.release();
420             buffer = null;
421         }
422     }
423 
424     final ByteBuf getBackingBuffer() {
425         return buffer;
426     }
427 }