View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.codec.compression;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.channel.ChannelHandlerContext;
20  import io.netty.handler.codec.ByteToMessageDecoder;
21  import io.netty.util.internal.EmptyArrays;
22  
23  import java.util.List;
24  import java.util.zip.Adler32;
25  import java.util.zip.Checksum;
26  
27  import static io.netty.handler.codec.compression.FastLz.*;
28  
29  /**
30   * Uncompresses a {@link ByteBuf} encoded by {@link FastLzFrameEncoder} using the FastLZ algorithm.
31   *
32   * See <a href="https://github.com/netty/netty/issues/2750">FastLZ format</a>.
33   */
34  public class FastLzFrameDecoder extends ByteToMessageDecoder {
35      /**
36       * Current state of decompression.
37       */
38      private enum State {
39          INIT_BLOCK,
40          INIT_BLOCK_PARAMS,
41          DECOMPRESS_DATA,
42          CORRUPTED
43      }
44  
45      private State currentState = State.INIT_BLOCK;
46  
47      /**
48       * Underlying checksum calculator in use.
49       */
50      private final Checksum checksum;
51  
52      /**
53       * Length of current received chunk of data.
54       */
55      private int chunkLength;
56  
57      /**
58       * Original of current received chunk of data.
59       * It is equal to {@link #chunkLength} for non compressed chunks.
60       */
61      private int originalLength;
62  
63      /**
64       * Indicates is this chunk compressed or not.
65       */
66      private boolean isCompressed;
67  
68      /**
69       * Indicates is this chunk has checksum or not.
70       */
71      private boolean hasChecksum;
72  
73      /**
74       * Chechsum value of current received chunk of data which has checksum.
75       */
76      private int currentChecksum;
77  
78      /**
79       * Creates the fastest FastLZ decoder without checksum calculation.
80       */
81      public FastLzFrameDecoder() {
82          this(false);
83      }
84  
85      /**
86       * Creates a FastLZ decoder with calculation of checksums as specified.
87       *
88       * @param validateChecksums
89       *        If true, the checksum field will be validated against the actual
90       *        uncompressed data, and if the checksums do not match, a suitable
91       *        {@link DecompressionException} will be thrown.
92       *        Note, that in this case decoder will use {@link java.util.zip.Adler32}
93       *        as a default checksum calculator.
94       */
95      public FastLzFrameDecoder(boolean validateChecksums) {
96          this(validateChecksums ? new Adler32() : null);
97      }
98  
99      /**
100      * Creates a FastLZ decoder with specified checksum calculator.
101      *
102      * @param checksum
103      *        the {@link Checksum} instance to use to check data for integrity.
104      *        You may set {@code null} if you do not want to validate checksum of each block.
105      */
106     public FastLzFrameDecoder(Checksum checksum) {
107         this.checksum = checksum;
108     }
109 
110     @Override
111     protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
112         try {
113             switch (currentState) {
114             case INIT_BLOCK:
115                 if (in.readableBytes() < 4) {
116                     break;
117                 }
118 
119                 final int magic = in.readUnsignedMedium();
120                 if (magic != MAGIC_NUMBER) {
121                     throw new DecompressionException("unexpected block identifier");
122                 }
123 
124                 final byte options = in.readByte();
125                 isCompressed = (options & 0x01) == BLOCK_TYPE_COMPRESSED;
126                 hasChecksum = (options & 0x10) == BLOCK_WITH_CHECKSUM;
127 
128                 currentState = State.INIT_BLOCK_PARAMS;
129             case INIT_BLOCK_PARAMS:
130                 if (in.readableBytes() < 2 + (isCompressed ? 2 : 0) + (hasChecksum ? 4 : 0)) {
131                     break;
132                 }
133                 currentChecksum = hasChecksum ? in.readInt() : 0;
134                 chunkLength = in.readUnsignedShort();
135                 originalLength = isCompressed ? in.readUnsignedShort() : chunkLength;
136 
137                 currentState = State.DECOMPRESS_DATA;
138             case DECOMPRESS_DATA:
139                 final int chunkLength = this.chunkLength;
140                 if (in.readableBytes() < chunkLength) {
141                     break;
142                 }
143 
144                 final int idx = in.readerIndex();
145                 final int originalLength = this.originalLength;
146 
147                 final ByteBuf uncompressed;
148                 final byte[] output;
149                 final int outputPtr;
150 
151                 if (originalLength != 0) {
152                     uncompressed = ctx.alloc().heapBuffer(originalLength, originalLength);
153                     output = uncompressed.array();
154                     outputPtr = uncompressed.arrayOffset() + uncompressed.writerIndex();
155                 } else {
156                     uncompressed = null;
157                     output = EmptyArrays.EMPTY_BYTES;
158                     outputPtr = 0;
159                 }
160 
161                 boolean success = false;
162                 try {
163                     if (isCompressed) {
164                         final byte[] input;
165                         final int inputPtr;
166                         if (in.hasArray()) {
167                             input = in.array();
168                             inputPtr = in.arrayOffset() + idx;
169                         } else {
170                             input = new byte[chunkLength];
171                             in.getBytes(idx, input);
172                             inputPtr = 0;
173                         }
174 
175                         final int decompressedBytes = decompress(input, inputPtr, chunkLength,
176                                 output, outputPtr, originalLength);
177                         if (originalLength != decompressedBytes) {
178                             throw new DecompressionException(String.format(
179                                     "stream corrupted: originalLength(%d) and actual length(%d) mismatch",
180                                     originalLength, decompressedBytes));
181                         }
182                     } else {
183                         in.getBytes(idx, output, outputPtr, chunkLength);
184                     }
185 
186                     final Checksum checksum = this.checksum;
187                     if (hasChecksum && checksum != null) {
188                         checksum.reset();
189                         checksum.update(output, outputPtr, originalLength);
190                         final int checksumResult = (int) checksum.getValue();
191                         if (checksumResult != currentChecksum) {
192                             throw new DecompressionException(String.format(
193                                     "stream corrupted: mismatching checksum: %d (expected: %d)",
194                                     checksumResult, currentChecksum));
195                         }
196                     }
197 
198                     if (uncompressed != null) {
199                         uncompressed.writerIndex(uncompressed.writerIndex() + originalLength);
200                         out.add(uncompressed);
201                     }
202                     in.skipBytes(chunkLength);
203 
204                     currentState = State.INIT_BLOCK;
205                     success = true;
206                 } finally {
207                     if (!success) {
208                         uncompressed.release();
209                     }
210                 }
211                 break;
212             case CORRUPTED:
213                 in.skipBytes(in.readableBytes());
214                 break;
215             default:
216                 throw new IllegalStateException();
217             }
218         } catch (Exception e) {
219             currentState = State.CORRUPTED;
220             throw e;
221         }
222     }
223 }