View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  
17  package io.netty.handler.codec.compression;
18  
19  import io.netty.buffer.ByteBuf;
20  import io.netty.channel.ChannelFuture;
21  import io.netty.channel.ChannelFutureListener;
22  import io.netty.channel.ChannelHandlerContext;
23  import io.netty.channel.ChannelPipeline;
24  import io.netty.channel.ChannelPromise;
25  import io.netty.channel.ChannelPromiseNotifier;
26  import io.netty.handler.codec.MessageToByteEncoder;
27  import io.netty.util.concurrent.EventExecutor;
28  import net.jpountz.lz4.LZ4Compressor;
29  import net.jpountz.lz4.LZ4Exception;
30  import net.jpountz.lz4.LZ4Factory;
31  import net.jpountz.xxhash.XXHashFactory;
32  
33  import java.util.concurrent.TimeUnit;
34  import java.util.zip.Checksum;
35  
36  import static io.netty.handler.codec.compression.Lz4Constants.*;
37  
38  /**
39   * Compresses a {@link ByteBuf} using the LZ4 format.
40   *
41   * See original <a href="http://code.google.com/p/lz4/">LZ4 website</a>
42   * and <a href="http://fastcompression.blogspot.ru/2011/05/lz4-explained.html">LZ4 block format</a>
43   * for full description.
44   *
45   * Since the original LZ4 block format does not contains size of compressed block and size of original data
46   * this encoder uses format like <a href="https://github.com/idelpivnitskiy/lz4-java">LZ4 Java</a> library
47   * written by Adrien Grand and approved by Yann Collet (author of original LZ4 library).
48   *
49   *  * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *     * * * * * * * * * *
50   *  * Magic * Token *  Compressed *  Decompressed *  Checksum *  +  *  LZ4 compressed *
51   *  *       *       *    length   *     length    *           *     *      block      *
52   *  * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *     * * * * * * * * * *
53   */
54  public class Lz4FrameEncoder extends MessageToByteEncoder<ByteBuf> {
55      /**
56       * Underlying compressor in use.
57       */
58      private LZ4Compressor compressor;
59  
60      /**
61       * Underlying checksum calculator in use.
62       */
63      private Checksum checksum;
64  
65      /**
66       * Compression level of current LZ4 encoder (depends on {@link #compressedBlockSize}).
67       */
68      private final int compressionLevel;
69  
70      /**
71       * Inner byte buffer for outgoing data.
72       */
73      private byte[] buffer;
74  
75      /**
76       * Current length of buffered bytes in {@link #buffer}.
77       */
78      private int currentBlockLength;
79  
80      /**
81       * Maximum size of compressed block with header.
82       */
83      private final int compressedBlockSize;
84  
85      /**
86       * Indicates if the compressed stream has been finished.
87       */
88      private volatile boolean finished;
89  
90      /**
91       * Used to interact with its {@link ChannelPipeline} and other handlers.
92       */
93      private volatile ChannelHandlerContext ctx;
94  
95      /**
96       * Creates the fastest LZ4 encoder with default block size (64 KB)
97       * and xxhash hashing for Java, based on Yann Collet's work available at
98       * <a href="http://code.google.com/p/xxhash/">Google Code</a>.
99       */
100     public Lz4FrameEncoder() {
101         this(false);
102     }
103 
104     /**
105      * Creates a new LZ4 encoder with hight or fast compression, default block size (64 KB)
106      * and xxhash hashing for Java, based on Yann Collet's work available at
107      * <a href="http://code.google.com/p/xxhash/">Google Code</a>.
108      *
109      * @param highCompressor  if {@code true} codec will use compressor which requires more memory
110      *                        and is slower but compresses more efficiently
111      */
112     public Lz4FrameEncoder(boolean highCompressor) {
113         this(LZ4Factory.fastestInstance(), highCompressor, DEFAULT_BLOCK_SIZE,
114                 XXHashFactory.fastestInstance().newStreamingHash32(DEFAULT_SEED).asChecksum());
115     }
116 
117     /**
118      * Creates a new customizable LZ4 encoder.
119      *
120      * @param factory         user customizable {@link net.jpountz.lz4.LZ4Factory} instance
121      *                        which may be JNI bindings to the original C implementation, a pure Java implementation
122      *                        or a Java implementation that uses the {@link sun.misc.Unsafe}
123      * @param highCompressor  if {@code true} codec will use compressor which requires more memory
124      *                        and is slower but compresses more efficiently
125      * @param blockSize       the maximum number of bytes to try to compress at once,
126      *                        must be >= 64 and <= 32 M
127      * @param checksum        the {@link Checksum} instance to use to check data for integrity
128      */
129     public Lz4FrameEncoder(LZ4Factory factory, boolean highCompressor, int blockSize, Checksum checksum) {
130         super(false);
131         if (factory == null) {
132             throw new NullPointerException("factory");
133         }
134         if (checksum == null) {
135             throw new NullPointerException("checksum");
136         }
137 
138         compressor = highCompressor ? factory.highCompressor() : factory.fastCompressor();
139         this.checksum = checksum;
140 
141         compressionLevel = compressionLevel(blockSize);
142         buffer = new byte[blockSize];
143         currentBlockLength = 0;
144         compressedBlockSize = HEADER_LENGTH + compressor.maxCompressedLength(blockSize);
145 
146         finished = false;
147     }
148 
149     /**
150      * Calculates compression level on the basis of block size.
151      */
152     private static int compressionLevel(int blockSize) {
153         if (blockSize < MIN_BLOCK_SIZE || blockSize > MAX_BLOCK_SIZE) {
154             throw new IllegalArgumentException(String.format(
155                     "blockSize: %d (expected: %d-%d)", blockSize, MIN_BLOCK_SIZE, MAX_BLOCK_SIZE));
156         }
157         int compressionLevel = 32 - Integer.numberOfLeadingZeros(blockSize - 1); // ceil of log2
158         compressionLevel = Math.max(0, compressionLevel - COMPRESSION_LEVEL_BASE);
159         return compressionLevel;
160     }
161 
162     @Override
163     protected void encode(ChannelHandlerContext ctx, ByteBuf in, ByteBuf out) throws Exception {
164         if (finished) {
165             out.writeBytes(in);
166             return;
167         }
168 
169         int length = in.readableBytes();
170 
171         final byte[] buffer = this.buffer;
172         final int blockSize = buffer.length;
173         while (currentBlockLength + length >= blockSize) {
174             final int tail = blockSize - currentBlockLength;
175             in.getBytes(in.readerIndex(), buffer, currentBlockLength, tail);
176             currentBlockLength = blockSize;
177             flushBufferedData(out);
178             in.skipBytes(tail);
179             length -= tail;
180         }
181         in.readBytes(buffer, currentBlockLength, length);
182         currentBlockLength += length;
183     }
184 
185     private void flushBufferedData(ByteBuf out) {
186         int currentBlockLength = this.currentBlockLength;
187         if (currentBlockLength == 0) {
188             return;
189         }
190         checksum.reset();
191         checksum.update(buffer, 0, currentBlockLength);
192         final int check = (int) checksum.getValue();
193 
194         out.ensureWritable(compressedBlockSize);
195         final int idx = out.writerIndex();
196         final byte[] dest = out.array();
197         final int destOff = out.arrayOffset() + idx;
198         int compressedLength;
199         try {
200             compressedLength = compressor.compress(buffer, 0, currentBlockLength, dest, destOff + HEADER_LENGTH);
201         } catch (LZ4Exception e) {
202             throw new CompressionException(e);
203         }
204         final int blockType;
205         if (compressedLength >= currentBlockLength) {
206             blockType = BLOCK_TYPE_NON_COMPRESSED;
207             compressedLength = currentBlockLength;
208             System.arraycopy(buffer, 0, dest, destOff + HEADER_LENGTH, currentBlockLength);
209         } else {
210             blockType = BLOCK_TYPE_COMPRESSED;
211         }
212 
213         out.setLong(idx, MAGIC_NUMBER);
214         dest[destOff + TOKEN_OFFSET] = (byte) (blockType | compressionLevel);
215         writeIntLE(compressedLength, dest, destOff + COMPRESSED_LENGTH_OFFSET);
216         writeIntLE(currentBlockLength, dest, destOff + DECOMPRESSED_LENGTH_OFFSET);
217         writeIntLE(check, dest, destOff + CHECKSUM_OFFSET);
218         out.writerIndex(idx + HEADER_LENGTH + compressedLength);
219         currentBlockLength = 0;
220 
221         this.currentBlockLength = currentBlockLength;
222     }
223 
224     private ChannelFuture finishEncode(final ChannelHandlerContext ctx, ChannelPromise promise) {
225         if (finished) {
226             promise.setSuccess();
227             return promise;
228         }
229         finished = true;
230 
231         final ByteBuf footer = ctx.alloc().heapBuffer(
232                 compressor.maxCompressedLength(currentBlockLength) + HEADER_LENGTH);
233         flushBufferedData(footer);
234 
235         final int idx = footer.writerIndex();
236         final byte[] dest = footer.array();
237         final int destOff = footer.arrayOffset() + idx;
238         footer.setLong(idx, MAGIC_NUMBER);
239         dest[destOff + TOKEN_OFFSET] = (byte) (BLOCK_TYPE_NON_COMPRESSED | compressionLevel);
240         writeIntLE(0, dest, destOff + COMPRESSED_LENGTH_OFFSET);
241         writeIntLE(0, dest, destOff + DECOMPRESSED_LENGTH_OFFSET);
242         writeIntLE(0, dest, destOff + CHECKSUM_OFFSET);
243         footer.writerIndex(idx + HEADER_LENGTH);
244 
245         compressor = null;
246         checksum = null;
247         buffer = null;
248 
249         return ctx.writeAndFlush(footer, promise);
250     }
251 
252     /**
253      * Writes {@code int} value into the byte buffer with little-endian format.
254      */
255     private static void writeIntLE(int i, byte[] buf, int off) {
256         buf[off++] = (byte) i;
257         buf[off++] = (byte) (i >>> 8);
258         buf[off++] = (byte) (i >>> 16);
259         buf[off]   = (byte) (i >>> 24);
260     }
261 
262     /**
263      * Returns {@code true} if and only if the compressed stream has been finished.
264      */
265     public boolean isClosed() {
266         return finished;
267     }
268 
269     /**
270      * Close this {@link Lz4FrameEncoder} and so finish the encoding.
271      *
272      * The returned {@link ChannelFuture} will be notified once the operation completes.
273      */
274     public ChannelFuture close() {
275         return close(ctx().newPromise());
276     }
277 
278     /**
279      * Close this {@link Lz4FrameEncoder} and so finish the encoding.
280      * The given {@link ChannelFuture} will be notified once the operation
281      * completes and will also be returned.
282      */
283     public ChannelFuture close(final ChannelPromise promise) {
284         ChannelHandlerContext ctx = ctx();
285         EventExecutor executor = ctx.executor();
286         if (executor.inEventLoop()) {
287             return finishEncode(ctx, promise);
288         } else {
289             executor.execute(new Runnable() {
290                 @Override
291                 public void run() {
292                     ChannelFuture f = finishEncode(ctx(), promise);
293                     f.addListener(new ChannelPromiseNotifier(promise));
294                 }
295             });
296             return promise;
297         }
298     }
299 
300     @Override
301     public void close(final ChannelHandlerContext ctx, final ChannelPromise promise) throws Exception {
302         ChannelFuture f = finishEncode(ctx, ctx.newPromise());
303         f.addListener(new ChannelFutureListener() {
304             @Override
305             public void operationComplete(ChannelFuture f) throws Exception {
306                 ctx.close(promise);
307             }
308         });
309 
310         if (!f.isDone()) {
311             // Ensure the channel is closed even if the write operation completes in time.
312             ctx.executor().schedule(new Runnable() {
313                 @Override
314                 public void run() {
315                     ctx.close(promise);
316                 }
317             }, 10, TimeUnit.SECONDS); // FIXME: Magic number
318         }
319     }
320 
321     private ChannelHandlerContext ctx() {
322         ChannelHandlerContext ctx = this.ctx;
323         if (ctx == null) {
324             throw new IllegalStateException("not added to a pipeline");
325         }
326         return ctx;
327     }
328 
329     @Override
330     public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
331         this.ctx = ctx;
332     }
333 }