View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.codec.compression;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.channel.ChannelHandlerContext;
20  import io.netty.handler.codec.ByteToMessageDecoder;
21  import net.jpountz.lz4.LZ4Exception;
22  import net.jpountz.lz4.LZ4Factory;
23  import net.jpountz.lz4.LZ4FastDecompressor;
24  import net.jpountz.xxhash.XXHashFactory;
25  
26  import java.util.List;
27  import java.util.zip.Checksum;
28  
29  import static io.netty.handler.codec.compression.Lz4Constants.*;
30  
31  /**
32   * Uncompresses a {@link ByteBuf} encoded with the LZ4 format.
33   *
34   * See original <a href="https://github.com/Cyan4973/lz4">LZ4 Github project</a>
35   * and <a href="http://fastcompression.blogspot.ru/2011/05/lz4-explained.html">LZ4 block format</a>
36   * for full description.
37   *
38   * Since the original LZ4 block format does not contains size of compressed block and size of original data
39   * this encoder uses format like <a href="https://github.com/idelpivnitskiy/lz4-java">LZ4 Java</a> library
40   * written by Adrien Grand and approved by Yann Collet (author of original LZ4 library).
41   *
42   *  * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *     * * * * * * * * * *
43   *  * Magic * Token *  Compressed *  Decompressed *  Checksum *  +  *  LZ4 compressed *
44   *  *       *       *    length   *     length    *           *     *      block      *
45   *  * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *     * * * * * * * * * *
46   */
47  public class Lz4FrameDecoder extends ByteToMessageDecoder {
48      /**
49       * Current state of stream.
50       */
51      private enum State {
52          INIT_BLOCK,
53          DECOMPRESS_DATA,
54          FINISHED,
55          CORRUPTED
56      }
57  
58      private State currentState = State.INIT_BLOCK;
59  
60      /**
61       * Underlying decompressor in use.
62       */
63      private LZ4FastDecompressor decompressor;
64  
65      /**
66       * Underlying checksum calculator in use.
67       */
68      private ByteBufChecksum checksum;
69  
70      /**
71       * Type of current block.
72       */
73      private int blockType;
74  
75      /**
76       * Compressed length of current incoming block.
77       */
78      private int compressedLength;
79  
80      /**
81       * Decompressed length of current incoming block.
82       */
83      private int decompressedLength;
84  
85      /**
86       * Checksum value of current incoming block.
87       */
88      private int currentChecksum;
89  
90      /**
91       * Creates the fastest LZ4 decoder.
92       *
93       * Note that by default, validation of the checksum header in each chunk is
94       * DISABLED for performance improvements. If performance is less of an issue,
95       * or if you would prefer the safety that checksum validation brings, please
96       * use the {@link #Lz4FrameDecoder(boolean)} constructor with the argument
97       * set to {@code true}.
98       */
99      public Lz4FrameDecoder() {
100         this(false);
101     }
102 
103     /**
104      * Creates a LZ4 decoder with fastest decoder instance available on your machine.
105      *
106      * @param validateChecksums  if {@code true}, the checksum field will be validated against the actual
107      *                           uncompressed data, and if the checksums do not match, a suitable
108      *                           {@link DecompressionException} will be thrown
109      */
110     public Lz4FrameDecoder(boolean validateChecksums) {
111         this(LZ4Factory.fastestInstance(), validateChecksums);
112     }
113 
114     /**
115      * Creates a new LZ4 decoder with customizable implementation.
116      *
117      * @param factory            user customizable {@link LZ4Factory} instance
118      *                           which may be JNI bindings to the original C implementation, a pure Java implementation
119      *                           or a Java implementation that uses the {@link sun.misc.Unsafe}
120      * @param validateChecksums  if {@code true}, the checksum field will be validated against the actual
121      *                           uncompressed data, and if the checksums do not match, a suitable
122      *                           {@link DecompressionException} will be thrown. In this case encoder will use
123      *                           xxhash hashing for Java, based on Yann Collet's work available at
124      *                           <a href="https://github.com/Cyan4973/xxHash">Github</a>.
125      */
126     public Lz4FrameDecoder(LZ4Factory factory, boolean validateChecksums) {
127         this(factory, validateChecksums ?
128                 XXHashFactory.fastestInstance().newStreamingHash32(DEFAULT_SEED).asChecksum()
129               : null);
130     }
131 
132     /**
133      * Creates a new customizable LZ4 decoder.
134      *
135      * @param factory   user customizable {@link LZ4Factory} instance
136      *                  which may be JNI bindings to the original C implementation, a pure Java implementation
137      *                  or a Java implementation that uses the {@link sun.misc.Unsafe}
138      * @param checksum  the {@link Checksum} instance to use to check data for integrity.
139      *                  You may set {@code null} if you do not want to validate checksum of each block
140      */
141     public Lz4FrameDecoder(LZ4Factory factory, Checksum checksum) {
142         if (factory == null) {
143             throw new NullPointerException("factory");
144         }
145         decompressor = factory.fastDecompressor();
146         this.checksum = checksum == null ? null : ByteBufChecksum.wrapChecksum(checksum);
147     }
148 
149     @Override
150     protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
151         try {
152             switch (currentState) {
153             case INIT_BLOCK:
154                 if (in.readableBytes() < HEADER_LENGTH) {
155                     break;
156                 }
157                 final long magic = in.readLong();
158                 if (magic != MAGIC_NUMBER) {
159                     throw new DecompressionException("unexpected block identifier");
160                 }
161 
162                 final int token = in.readByte();
163                 final int compressionLevel = (token & 0x0F) + COMPRESSION_LEVEL_BASE;
164                 int blockType = token & 0xF0;
165 
166                 int compressedLength = Integer.reverseBytes(in.readInt());
167                 if (compressedLength < 0 || compressedLength > MAX_BLOCK_SIZE) {
168                     throw new DecompressionException(String.format(
169                             "invalid compressedLength: %d (expected: 0-%d)",
170                             compressedLength, MAX_BLOCK_SIZE));
171                 }
172 
173                 int decompressedLength = Integer.reverseBytes(in.readInt());
174                 final int maxDecompressedLength = 1 << compressionLevel;
175                 if (decompressedLength < 0 || decompressedLength > maxDecompressedLength) {
176                     throw new DecompressionException(String.format(
177                             "invalid decompressedLength: %d (expected: 0-%d)",
178                             decompressedLength, maxDecompressedLength));
179                 }
180                 if (decompressedLength == 0 && compressedLength != 0
181                         || decompressedLength != 0 && compressedLength == 0
182                         || blockType == BLOCK_TYPE_NON_COMPRESSED && decompressedLength != compressedLength) {
183                     throw new DecompressionException(String.format(
184                             "stream corrupted: compressedLength(%d) and decompressedLength(%d) mismatch",
185                             compressedLength, decompressedLength));
186                 }
187 
188                 int currentChecksum = Integer.reverseBytes(in.readInt());
189                 if (decompressedLength == 0 && compressedLength == 0) {
190                     if (currentChecksum != 0) {
191                         throw new DecompressionException("stream corrupted: checksum error");
192                     }
193                     currentState = State.FINISHED;
194                     decompressor = null;
195                     checksum = null;
196                     break;
197                 }
198 
199                 this.blockType = blockType;
200                 this.compressedLength = compressedLength;
201                 this.decompressedLength = decompressedLength;
202                 this.currentChecksum = currentChecksum;
203 
204                 currentState = State.DECOMPRESS_DATA;
205                 // fall through
206             case DECOMPRESS_DATA:
207                 blockType = this.blockType;
208                 compressedLength = this.compressedLength;
209                 decompressedLength = this.decompressedLength;
210                 currentChecksum = this.currentChecksum;
211 
212                 if (in.readableBytes() < compressedLength) {
213                     break;
214                 }
215 
216                 final ByteBufChecksum checksum = this.checksum;
217                 ByteBuf uncompressed = null;
218 
219                 try {
220                     switch (blockType) {
221                         case BLOCK_TYPE_NON_COMPRESSED:
222                             // Just pass through, we not update the readerIndex yet as we do this outside of the
223                             // switch statement.
224                             uncompressed = in.retainedSlice(in.readerIndex(), decompressedLength);
225                             break;
226                         case BLOCK_TYPE_COMPRESSED:
227                             uncompressed = ctx.alloc().buffer(decompressedLength, decompressedLength);
228 
229                             decompressor.decompress(CompressionUtil.safeNioBuffer(in),
230                                     uncompressed.internalNioBuffer(uncompressed.writerIndex(), decompressedLength));
231                             // Update the writerIndex now to reflect what we decompressed.
232                             uncompressed.writerIndex(uncompressed.writerIndex() + decompressedLength);
233                             break;
234                         default:
235                             throw new DecompressionException(String.format(
236                                     "unexpected blockType: %d (expected: %d or %d)",
237                                     blockType, BLOCK_TYPE_NON_COMPRESSED, BLOCK_TYPE_COMPRESSED));
238                     }
239                     // Skip inbound bytes after we processed them.
240                     in.skipBytes(compressedLength);
241 
242                     if (checksum != null) {
243                         CompressionUtil.checkChecksum(checksum, uncompressed, currentChecksum);
244                     }
245                     out.add(uncompressed);
246                     uncompressed = null;
247                     currentState = State.INIT_BLOCK;
248                 } catch (LZ4Exception e) {
249                     throw new DecompressionException(e);
250                 } finally {
251                     if (uncompressed != null) {
252                         uncompressed.release();
253                     }
254                 }
255                 break;
256             case FINISHED:
257             case CORRUPTED:
258                 in.skipBytes(in.readableBytes());
259                 break;
260             default:
261                 throw new IllegalStateException();
262             }
263         } catch (Exception e) {
264             currentState = State.CORRUPTED;
265             throw e;
266         }
267     }
268 
269     /**
270      * Returns {@code true} if and only if the end of the compressed stream
271      * has been reached.
272      */
273     public boolean isClosed() {
274         return currentState == State.FINISHED;
275     }
276 }