View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.codec.compression;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.channel.ChannelHandlerContext;
20  import io.netty.handler.codec.ByteToMessageDecoder;
21  import io.netty.util.internal.ObjectUtil;
22  import net.jpountz.lz4.LZ4Exception;
23  import net.jpountz.lz4.LZ4Factory;
24  import net.jpountz.lz4.LZ4FastDecompressor;
25  
26  import java.util.List;
27  import java.util.zip.Checksum;
28  
29  import static io.netty.handler.codec.compression.Lz4Constants.BLOCK_TYPE_COMPRESSED;
30  import static io.netty.handler.codec.compression.Lz4Constants.BLOCK_TYPE_NON_COMPRESSED;
31  import static io.netty.handler.codec.compression.Lz4Constants.COMPRESSION_LEVEL_BASE;
32  import static io.netty.handler.codec.compression.Lz4Constants.DEFAULT_SEED;
33  import static io.netty.handler.codec.compression.Lz4Constants.HEADER_LENGTH;
34  import static io.netty.handler.codec.compression.Lz4Constants.MAGIC_NUMBER;
35  import static io.netty.handler.codec.compression.Lz4Constants.MAX_BLOCK_SIZE;
36  
37  /**
38   * Uncompresses a {@link ByteBuf} encoded with the LZ4 format.
39   *
40   * See original <a href="https://github.com/Cyan4973/lz4">LZ4 Github project</a>
41   * and <a href="https://fastcompression.blogspot.ru/2011/05/lz4-explained.html">LZ4 block format</a>
42   * for full description.
43   *
44   * Since the original LZ4 block format does not contains size of compressed block and size of original data
45   * this encoder uses format like <a href="https://github.com/idelpivnitskiy/lz4-java">LZ4 Java</a> library
46   * written by Adrien Grand and approved by Yann Collet (author of original LZ4 library).
47   *
48   *  * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *     * * * * * * * * * *
49   *  * Magic * Token *  Compressed *  Decompressed *  Checksum *  +  *  LZ4 compressed *
50   *  *       *       *    length   *     length    *           *     *      block      *
51   *  * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *     * * * * * * * * * *
52   */
53  public class Lz4FrameDecoder extends ByteToMessageDecoder {
54      private final int maxDecompressedLength;
55      /**
56       * Current state of stream.
57       */
58      private enum State {
59          INIT_BLOCK,
60          DECOMPRESS_DATA,
61          FINISHED,
62          CORRUPTED
63      }
64  
65      private State currentState = State.INIT_BLOCK;
66  
67      /**
68       * Underlying decompressor in use.
69       */
70      private LZ4FastDecompressor decompressor;
71  
72      /**
73       * Underlying checksum calculator in use.
74       */
75      private ByteBufChecksum checksum;
76  
77      /**
78       * Type of current block.
79       */
80      private int blockType;
81  
82      /**
83       * Compressed length of current incoming block.
84       */
85      private int compressedLength;
86  
87      /**
88       * Decompressed length of current incoming block.
89       */
90      private int decompressedLength;
91  
92      /**
93       * Checksum value of current incoming block.
94       */
95      private int currentChecksum;
96  
97      /**
98       * Creates the fastest LZ4 decoder.
99       *
100      * Note that by default, validation of the checksum header in each chunk is
101      * DISABLED for performance improvements. If performance is less of an issue,
102      * or if you would prefer the safety that checksum validation brings, please
103      * use the {@link #Lz4FrameDecoder(boolean)} constructor with the argument
104      * set to {@code true}.
105      */
106     public Lz4FrameDecoder() {
107         this(false);
108     }
109 
110     /**
111      * Creates a LZ4 decoder with fastest decoder instance available on your machine.
112      *
113      * @param validateChecksums  if {@code true}, the checksum field will be validated against the actual
114      *                           uncompressed data, and if the checksums do not match, a suitable
115      *                           {@link DecompressionException} will be thrown
116      */
117     public Lz4FrameDecoder(boolean validateChecksums) {
118         this(LZ4Factory.fastestInstance(), validateChecksums);
119     }
120 
121     /**
122      * Creates a LZ4 decoder with fastest decoder instance available on your machine.
123      *
124      * @param validateChecksums  if {@code true}, the checksum field will be validated against the actual
125      *                           uncompressed data, and if the checksums do not match, a suitable
126      *                           {@link DecompressionException} will be thrown
127      * @param maxDecompressedLength
128      *                          maximum length of the decompressed block. If {@code 0} is given it uses {@code 32MB}
129      *                          by default.
130      */
131     public Lz4FrameDecoder(boolean validateChecksums, int maxDecompressedLength) {
132         this(LZ4Factory.fastestInstance(), validateChecksums ? new Lz4XXHash32(DEFAULT_SEED) : null,
133                 maxDecompressedLength);
134     }
135 
136     /**
137      * Creates a new LZ4 decoder with customizable implementation.
138      *
139      * @param factory            user customizable {@link LZ4Factory} instance
140      *                           which may be JNI bindings to the original C implementation, a pure Java implementation
141      *                           or a Java implementation that uses the {@link sun.misc.Unsafe}
142      * @param validateChecksums  if {@code true}, the checksum field will be validated against the actual
143      *                           uncompressed data, and if the checksums do not match, a suitable
144      *                           {@link DecompressionException} will be thrown. In this case encoder will use
145      *                           xxhash hashing for Java, based on Yann Collet's work available at
146      *                           <a href="https://github.com/Cyan4973/xxHash">Github</a>.
147      */
148     public Lz4FrameDecoder(LZ4Factory factory, boolean validateChecksums) {
149         this(factory, validateChecksums ? new Lz4XXHash32(DEFAULT_SEED) : null);
150     }
151 
152     /**
153      * Creates a new customizable LZ4 decoder.
154      *
155      * @param factory   user customizable {@link LZ4Factory} instance
156      *                  which may be JNI bindings to the original C implementation, a pure Java implementation
157      *                  or a Java implementation that uses the {@link sun.misc.Unsafe}
158      * @param checksum  the {@link Checksum} instance to use to check data for integrity.
159      *                  You may set {@code null} if you do not want to validate checksum of each block
160      */
161     public Lz4FrameDecoder(LZ4Factory factory, Checksum checksum) {
162         this(factory, checksum, MAX_BLOCK_SIZE);
163     }
164 
165     /**
166      * Creates a new customizable LZ4 decoder.
167      *
168      * @param factory   user customizable {@link LZ4Factory} instance
169      *                  which may be JNI bindings to the original C implementation, a pure Java implementation
170      *                  or a Java implementation that uses the {@link sun.misc.Unsafe}
171      * @param checksum  the {@link Checksum} instance to use to check data for integrity.
172      *                  You may set {@code null} if you do not want to validate checksum of each block
173      * @param maxDecompressedLength
174      *                  maximum length of the decompressed block. If {@code 0} is given it uses {@code 32MB} by default.
175      */
176     public Lz4FrameDecoder(LZ4Factory factory, Checksum checksum, int maxDecompressedLength) {
177         decompressor = ObjectUtil.checkNotNull(factory, "factory").fastDecompressor();
178         this.checksum = checksum == null ? null : ByteBufChecksum.wrapChecksum(checksum);
179         this.maxDecompressedLength = maxDecompressedLength == 0 ? MAX_BLOCK_SIZE :
180                 ObjectUtil.checkInRange(maxDecompressedLength, 0, MAX_BLOCK_SIZE, "maxDecompressedLength");
181     }
182 
183     @Override
184     protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
185         try {
186             switch (currentState) {
187             case INIT_BLOCK:
188                 if (in.readableBytes() < HEADER_LENGTH) {
189                     break;
190                 }
191                 final long magic = in.readLong();
192                 if (magic != MAGIC_NUMBER) {
193                     throw new DecompressionException("unexpected block identifier");
194                 }
195 
196                 final int token = in.readByte();
197                 final int compressionLevel = (token & 0x0F) + COMPRESSION_LEVEL_BASE;
198                 int blockType = token & 0xF0;
199 
200                 int compressedLength = Integer.reverseBytes(in.readInt());
201                 if (compressedLength < 0 || compressedLength > MAX_BLOCK_SIZE) {
202                     throw new DecompressionException(String.format(
203                             "invalid compressedLength: %d (expected: 0-%d)",
204                             compressedLength, MAX_BLOCK_SIZE));
205                 }
206 
207                 int decompressedLength = Integer.reverseBytes(in.readInt());
208                 if (decompressedLength > maxDecompressedLength) {
209                     throw new DecompressionException(String.format(
210                             "decompressedLength too large: %d (expected: 0-%d)",
211                             decompressedLength, maxDecompressedLength));
212                 }
213 
214                 final int maxLocalDecompressedLength = 1 << compressionLevel;
215                 if (decompressedLength < 0 || decompressedLength > maxLocalDecompressedLength) {
216                     throw new DecompressionException(String.format(
217                             "invalid decompressedLength: %d (expected: 0-%d)",
218                             decompressedLength, maxLocalDecompressedLength));
219                 }
220                 if (decompressedLength == 0 && compressedLength != 0
221                         || decompressedLength != 0 && compressedLength == 0
222                         || blockType == BLOCK_TYPE_NON_COMPRESSED && decompressedLength != compressedLength) {
223                     throw new DecompressionException(String.format(
224                             "stream corrupted: compressedLength(%d) and decompressedLength(%d) mismatch",
225                             compressedLength, decompressedLength));
226                 }
227 
228                 int currentChecksum = Integer.reverseBytes(in.readInt());
229                 if (decompressedLength == 0 && compressedLength == 0) {
230                     if (currentChecksum != 0) {
231                         throw new DecompressionException("stream corrupted: checksum error");
232                     }
233                     currentState = State.FINISHED;
234                     decompressor = null;
235                     checksum = null;
236                     break;
237                 }
238 
239                 this.blockType = blockType;
240                 this.compressedLength = compressedLength;
241                 this.decompressedLength = decompressedLength;
242                 this.currentChecksum = currentChecksum;
243 
244                 currentState = State.DECOMPRESS_DATA;
245                 // fall through
246             case DECOMPRESS_DATA:
247                 blockType = this.blockType;
248                 compressedLength = this.compressedLength;
249                 decompressedLength = this.decompressedLength;
250                 currentChecksum = this.currentChecksum;
251 
252                 if (in.readableBytes() < compressedLength) {
253                     break;
254                 }
255 
256                 final ByteBufChecksum checksum = this.checksum;
257                 ByteBuf uncompressed = null;
258 
259                 try {
260                     switch (blockType) {
261                         case BLOCK_TYPE_NON_COMPRESSED:
262                             // Just pass through, we not update the readerIndex yet as we do this outside of the
263                             // switch statement.
264                             uncompressed = in.retainedSlice(in.readerIndex(), decompressedLength);
265                             break;
266                         case BLOCK_TYPE_COMPRESSED:
267                             uncompressed = ctx.alloc().buffer(decompressedLength, decompressedLength);
268 
269                             decompressor.decompress(CompressionUtil.safeReadableNioBuffer(in),
270                                     uncompressed.internalNioBuffer(uncompressed.writerIndex(), decompressedLength));
271                             // Update the writerIndex now to reflect what we decompressed.
272                             uncompressed.writerIndex(uncompressed.writerIndex() + decompressedLength);
273                             break;
274                         default:
275                             throw new DecompressionException(String.format(
276                                     "unexpected blockType: %d (expected: %d or %d)",
277                                     blockType, BLOCK_TYPE_NON_COMPRESSED, BLOCK_TYPE_COMPRESSED));
278                     }
279                     // Skip inbound bytes after we processed them.
280                     in.skipBytes(compressedLength);
281 
282                     if (checksum != null) {
283                         CompressionUtil.checkChecksum(checksum, uncompressed, currentChecksum);
284                     }
285                     out.add(uncompressed);
286                     uncompressed = null;
287                     currentState = State.INIT_BLOCK;
288                 } catch (LZ4Exception e) {
289                     throw new DecompressionException(e);
290                 } finally {
291                     if (uncompressed != null) {
292                         uncompressed.release();
293                     }
294                 }
295                 break;
296             case FINISHED:
297             case CORRUPTED:
298                 in.skipBytes(in.readableBytes());
299                 break;
300             default:
301                 throw new IllegalStateException();
302             }
303         } catch (Exception e) {
304             currentState = State.CORRUPTED;
305             throw e;
306         }
307     }
308 
309     /**
310      * Returns {@code true} if and only if the end of the compressed stream
311      * has been reached.
312      */
313     public boolean isClosed() {
314         return currentState == State.FINISHED;
315     }
316 }