View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty5.handler.codec.compression;
17  
18  import io.netty5.buffer.api.Buffer;
19  import io.netty5.buffer.api.BufferAllocator;
20  
21  import java.util.function.Supplier;
22  import java.util.zip.Adler32;
23  import java.util.zip.Checksum;
24  
25  import static io.netty5.handler.codec.compression.FastLz.BLOCK_TYPE_COMPRESSED;
26  import static io.netty5.handler.codec.compression.FastLz.BLOCK_WITH_CHECKSUM;
27  import static io.netty5.handler.codec.compression.FastLz.MAGIC_NUMBER;
28  
29  /**
30   * Uncompresses a {@link Buffer} encoded by {@link FastLzCompressor} using the FastLZ algorithm.
31   *
32   * See <a href="https://github.com/netty/netty/issues/2750">FastLZ format</a>.
33   */
34  public final class FastLzDecompressor implements Decompressor {
35      /**
36       * Current state of decompression.
37       */
38      private enum State {
39          INIT_BLOCK,
40          INIT_BLOCK_PARAMS,
41          DECOMPRESS_DATA,
42          DONE,
43          CORRUPTED,
44          CLOSED
45      }
46  
47      private State currentState = State.INIT_BLOCK;
48  
49      /**
50       * Underlying checksum calculator in use.
51       */
52      private final BufferChecksum checksum;
53  
54      /**
55       * Length of current received chunk of data.
56       */
57      private int chunkLength;
58  
59      /**
60       * Original of current received chunk of data.
61       * It is equal to {@link #chunkLength} for non compressed chunks.
62       */
63      private int originalLength;
64  
65      /**
66       * Indicates is this chunk compressed or not.
67       */
68      private boolean isCompressed;
69  
70      /**
71       * Indicates is this chunk has checksum or not.
72       */
73      private boolean hasChecksum;
74  
75      /**
76       * Checksum value of current received chunk of data which has checksum.
77       */
78      private int currentChecksum;
79  
80      private FastLzDecompressor(Checksum checksum) {
81          this.checksum = checksum == null ? null : new BufferChecksum(checksum);
82      }
83  
84      /**
85       * Creates the fastest FastLZ decompressor factory without checksum calculation.
86       *
87       * @return the factory.
88       */
89      public static Supplier<FastLzDecompressor> newFactory() {
90          return newFactory(false);
91      }
92  
93      /**
94       * Creates a FastLZ decompressor factory with calculation of checksums as specified.
95       *
96       * @param validateChecksums
97       *        If true, the checksum field will be validated against the actual
98       *        uncompressed data, and if the checksums do not match, a suitable
99       *        {@link DecompressionException} will be thrown.
100      *        Note, that in this case decoder will use {@link java.util.zip.Adler32}
101      *        as a default checksum calculator.
102      * @return the factory.
103      */
104     public static Supplier<FastLzDecompressor> newFactory(boolean validateChecksums) {
105         return newFactory(validateChecksums ? new Adler32() : null);
106     }
107 
108     /**
109      * Creates a FastLZ decompressor factory with specified checksum calculator.
110      *
111      * @param checksum
112      *        the {@link Checksum} instance to use to check data for integrity.
113      *        You may set {@code null} if you do not want to validate checksum of each block.
114      * @return the factory.
115      */
116     public static Supplier<FastLzDecompressor> newFactory(Checksum checksum) {
117         return () -> new FastLzDecompressor(checksum);
118     }
119 
120     @Override
121     public Buffer decompress(Buffer in, BufferAllocator allocator) throws DecompressionException {
122         switch (currentState) {
123             case CLOSED:
124                 throw new DecompressionException("Decompressor closed");
125             case DONE:
126             case CORRUPTED:
127                 return allocator.allocate(0);
128             case INIT_BLOCK:
129                 if (in.readableBytes() < 4) {
130                     return null;
131                 }
132 
133                 final int magic = in.readUnsignedMedium();
134                 if (magic != MAGIC_NUMBER) {
135                     streamCorrupted("unexpected block identifier");
136                 }
137 
138                 final byte options = in.readByte();
139                 isCompressed = (options & 0x01) == BLOCK_TYPE_COMPRESSED;
140                 hasChecksum = (options & 0x10) == BLOCK_WITH_CHECKSUM;
141 
142                 currentState = State.INIT_BLOCK_PARAMS;
143                 // fall through
144             case INIT_BLOCK_PARAMS:
145                 if (in.readableBytes() < 2 + (isCompressed ? 2 : 0) + (hasChecksum ? 4 : 0)) {
146                     return null;
147                 }
148                 currentChecksum = hasChecksum ? in.readInt() : 0;
149                 chunkLength = in.readUnsignedShort();
150                 originalLength = isCompressed ? in.readUnsignedShort() : chunkLength;
151 
152                 currentState = State.DECOMPRESS_DATA;
153                 // fall through
154             case DECOMPRESS_DATA:
155                 final int chunkLength = this.chunkLength;
156                 if (in.readableBytes() < chunkLength) {
157                     return null;
158                 }
159 
160                 final int idx = in.readerOffset();
161                 final int originalLength = this.originalLength;
162 
163                 Buffer output = null;
164                 try {
165                     if (isCompressed) {
166                         output = allocator.allocate(originalLength);
167                         int outputOffset = output.writerOffset();
168                         final int decompressedBytes = FastLz.decompress(in, idx, chunkLength,
169                                 output, outputOffset, originalLength);
170                         if (originalLength != decompressedBytes) {
171                             streamCorrupted(String.format(
172                                     "stream corrupted: originalLength(%d) and actual length(%d) mismatch",
173                                     originalLength, decompressedBytes));
174                         }
175                         output.skipWritableBytes(decompressedBytes);
176                         in.skipReadableBytes(chunkLength);
177                     } else {
178                         output = in.readSplit(chunkLength);
179                     }
180 
181                     final BufferChecksum checksum = this.checksum;
182                     if (hasChecksum && checksum != null) {
183                         checksum.reset();
184                         checksum.update(output, output.readerOffset(), output.readableBytes());
185                         final int checksumResult = (int) checksum.getValue();
186                         if (checksumResult != currentChecksum) {
187                             streamCorrupted(String.format(
188                                     "stream corrupted: mismatching checksum: %d (expected: %d)",
189                                     checksumResult, currentChecksum));
190                         }
191                     }
192 
193                     final Buffer data;
194                     if (output.readableBytes() > 0) {
195                         data = output;
196                         output = null;
197                     } else {
198                         data = null;
199                     }
200 
201                     currentState = State.INIT_BLOCK;
202                     return data;
203                 } finally {
204                     if (output != null) {
205                         output.close();
206                     }
207                 }
208             default:
209                 throw new IllegalStateException();
210         }
211     }
212 
213     @Override
214     public boolean isFinished() {
215         return currentState == State.DONE || currentState == State.CORRUPTED || currentState == State.CLOSED;
216     }
217 
218     @Override
219     public boolean isClosed() {
220         return currentState == State.CLOSED;
221     }
222 
223     @Override
224     public void close() {
225         currentState = State.CLOSED;
226     }
227 
228     private void streamCorrupted(String message) {
229         currentState = State.CORRUPTED;
230         throw new DecompressionException(message);
231     }
232 }