View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty5.handler.codec.compression;
17  
18  import com.ning.compress.BufferRecycler;
19  import com.ning.compress.lzf.ChunkEncoder;
20  import com.ning.compress.lzf.LZFChunk;
21  import com.ning.compress.lzf.LZFEncoder;
22  import com.ning.compress.lzf.util.ChunkEncoderFactory;
23  import io.netty5.buffer.api.Buffer;
24  import io.netty5.buffer.api.BufferAllocator;
25  
26  import java.util.function.Supplier;
27  
28  import static com.ning.compress.lzf.LZFChunk.MAX_CHUNK_LEN;
29  
30  /**
31   * Compresses a {@link Buffer} using the LZF format.
32   * <p>
33   * See original <a href="http://oldhome.schmorp.de/marc/liblzf.html">LZF package</a>
34   * and <a href="https://github.com/ning/compress/wiki/LZFFormat">LZF format</a> for full description.
35   */
36  public final class LzfCompressor implements Compressor {
37  
38      /**
39       * Minimum block size ready for compression. Blocks with length
40       * less than {@link #MIN_BLOCK_TO_COMPRESS} will write as uncompressed.
41       */
42      private static final int MIN_BLOCK_TO_COMPRESS = 16;
43  
44      /**
45       * Compress threshold for LZF format. When the amount of input data is less than compressThreshold,
46       * we will construct an uncompressed output according to the LZF format.
47       * <p>
48       * When the value is less than {@see ChunkEncoder#MIN_BLOCK_TO_COMPRESS}, since LZF will not compress data
49       * that is less than {@see ChunkEncoder#MIN_BLOCK_TO_COMPRESS}, compressThreshold will not work.
50       */
51      private final int compressThreshold;
52  
53      /**
54       * Underlying decoder in use.
55       */
56      private final ChunkEncoder encoder;
57  
58      /**
59       * Object that handles details of buffer recycling.
60       */
61      private final BufferRecycler recycler;
62  
63      private enum State {
64          PROCESSING,
65          FINISHED,
66          CLOSED
67      }
68  
69      private State state = State.PROCESSING;
70  
71      /**
72       * Creates a new LZF compressor with specified settings.
73       *
74       * @param safeInstance          If {@code true} encoder will use {@link ChunkEncoder} that only uses standard
75       *                              JDK access methods, and should work on all Java platforms and JVMs.
76       *                              Otherwise encoder will try to use highly optimized {@link ChunkEncoder}
77       *                              implementation that uses Sun JDK's {@link sun.misc.Unsafe}
78       *                              class (which may be included by other JDK's as well).
79       * @param totalLength           Expected total length of content to compress; only matters for outgoing messages
80       *                              that is smaller than maximum chunk size (64k), to optimize encoding hash tables.
81       * @param compressThreshold     Compress threshold for LZF format. When the amount of input data is less than
82       *                              compressThreshold, we will construct an uncompressed output according
83       *                              to the LZF format.
84       */
85      private LzfCompressor(boolean safeInstance, int totalLength, int compressThreshold) {
86          this.compressThreshold = compressThreshold;
87  
88          this.encoder = safeInstance ?
89                  ChunkEncoderFactory.safeNonAllocatingInstance(totalLength)
90                  : ChunkEncoderFactory.optimalNonAllocatingInstance(totalLength);
91  
92          this.recycler = BufferRecycler.instance();
93      }
94  
95      /**
96       * Creates a new LZF compressor factory with the most optimal available methods for underlying data access.
97       * It will "unsafe" instance if one can be used on current JVM.
98       * It should be safe to call this constructor as implementations are dynamically loaded; however, on some
99       * non-standard platforms it may be necessary to use {@link #newFactory(boolean)} with {@code true} param.
100      * @return the factory.
101      */
102     public static Supplier<LzfCompressor> newFactory() {
103         return newFactory(false);
104     }
105 
106     /**
107      * Creates a new LZF compressor factory with specified encoding instance.
108      *
109      * @param safeInstance If {@code true} encoder will use {@link ChunkEncoder} that only uses
110      *                     standard JDK access methods, and should work on all Java platforms and JVMs.
111      *                     Otherwise encoder will try to use highly optimized {@link ChunkEncoder}
112      *                     implementation that uses Sun JDK's {@link sun.misc.Unsafe}
113      *                     class (which may be included by other JDK's as well).
114      * @return the factory.
115      */
116     public static Supplier<LzfCompressor> newFactory(boolean safeInstance) {
117         return newFactory(safeInstance, MAX_CHUNK_LEN);
118     }
119 
120     /**
121      * Creates a new LZF compressor factory with specified encoding instance and compressThreshold.
122      *
123      * @param safeInstance      If {@code true} encoder will use {@link ChunkEncoder} that only uses standard
124      *                          JDK access methods, and should work on all Java platforms and JVMs.
125      *                          Otherwise encoder will try to use highly optimized {@link ChunkEncoder}
126      *                          implementation that uses Sun JDK's {@link sun.misc.Unsafe}
127      *                          class (which may be included by other JDK's as well).
128      * @param totalLength       Expected total length of content to compress; only matters for outgoing messages
129      *                          that is smaller than maximum chunk size (64k), to optimize encoding hash tables.
130      * @return the factory.
131      */
132     public static Supplier<LzfCompressor> newFactory(boolean safeInstance, int totalLength) {
133         return newFactory(safeInstance, totalLength, LzfCompressor.MIN_BLOCK_TO_COMPRESS);
134     }
135 
136     /**
137      * Creates a new LZF compressor factory with specified total length of encoded chunk. You can configure it to encode
138      * your data flow more efficient if you know the average size of messages that you send.
139      *
140      * @param totalLength Expected total length of content to compress;
141      *                    only matters for outgoing messages that is smaller than maximum chunk size (64k),
142      *                    to optimize encoding hash tables.
143      * @return the factory.
144      */
145     public static Supplier<LzfCompressor> newFactory(int totalLength) {
146         return newFactory(false, totalLength);
147     }
148 
149     /**
150      * Creates a new LZF compressor factory with specified settings.
151      *
152      * @param safeInstance          If {@code true} encoder will use {@link ChunkEncoder} that only uses standard JDK
153      *                              access methods, and should work on all Java platforms and JVMs.
154      *                              Otherwise encoder will try to use highly optimized {@link ChunkEncoder}
155      *                              implementation that uses Sun JDK's {@link sun.misc.Unsafe}
156      *                              class (which may be included by other JDK's as well).
157      * @param totalLength           Expected total length of content to compress; only matters for outgoing messages
158      *                              that is smaller than maximum chunk size (64k), to optimize encoding hash tables.
159      * @param compressThreshold     Compress threshold for LZF format. When the amount of input data is less than
160      *                              compressThreshold, we will construct an uncompressed output according
161      *                              to the LZF format.
162      * @return the factory.
163      */
164     public static Supplier<LzfCompressor> newFactory(boolean safeInstance, int totalLength, int compressThreshold) {
165         if (totalLength < MIN_BLOCK_TO_COMPRESS || totalLength > MAX_CHUNK_LEN) {
166             throw new IllegalArgumentException("totalLength: " + totalLength +
167                     " (expected: " + MIN_BLOCK_TO_COMPRESS + '-' + MAX_CHUNK_LEN + ')');
168         }
169 
170         if (compressThreshold < MIN_BLOCK_TO_COMPRESS) {
171             // not a suitable value.
172             throw new IllegalArgumentException("compressThreshold:" + compressThreshold +
173                     " expected >=" + MIN_BLOCK_TO_COMPRESS);
174         }
175         return () -> new LzfCompressor(safeInstance, totalLength, compressThreshold);
176     }
177 
178     @Override
179     public Buffer compress(Buffer in, BufferAllocator allocator) throws CompressionException {
180         switch (state) {
181             case CLOSED:
182                 throw new CompressionException("Compressor closed");
183             case FINISHED:
184                 return allocator.allocate(0);
185             case PROCESSING:
186                 if (in.readableBytes() == 0) {
187                     return allocator.allocate(0);
188                 }
189                 try (var readableIteration = in.forEachReadable()) {
190                     var readableComponent = readableIteration.first();
191                     final int length = readableComponent.readableBytes();
192                     final byte[] input;
193                     final int inputPtr;
194 
195                     if (readableComponent.hasReadableArray()) {
196                         input = readableComponent.readableArray();
197                         inputPtr = readableComponent.readableArrayOffset();
198                     } else {
199                         input = recycler.allocInputBuffer(length);
200                         readableComponent.readableBuffer().get(input, 0, length);
201                         inputPtr = 0;
202                     }
203 
204                     // Estimate may apparently under-count by one in some cases.
205                     final byte[] output = recycler.allocOutputBuffer(LZFEncoder.estimateMaxWorkspaceSize(length) + 1);
206                     try {
207                         final int outputLength;
208                         if (length >= compressThreshold) {
209                             // compress.
210                             outputLength = encodeCompress(input, inputPtr, length, output, 0);
211                         } else {
212                             // not compress.
213                             outputLength = encodeNonCompress(input, inputPtr, length, output, 0);
214                         }
215 
216                         readableComponent.skipReadableBytes(length);
217 
218                         if (!readableComponent.hasReadableArray()) {
219                             recycler.releaseInputBuffer(input);
220                         }
221                         return allocator.allocate(outputLength).writeBytes(output, 0, outputLength);
222                     } finally {
223                         recycler.releaseOutputBuffer(output);
224                     }
225                 }
226 
227             default:
228                 throw new IllegalStateException();
229         }
230     }
231 
232     private int encodeCompress(byte[] input, int inputPtr, int length, byte[] output, int outputPtr) {
233         return LZFEncoder.appendEncoded(encoder,
234                 input, inputPtr, length, output, outputPtr) - outputPtr;
235     }
236 
237     private static int lzfEncodeNonCompress(byte[] input, int inputPtr, int length, byte[] output, int outputPtr) {
238         int left = length;
239         int chunkLen = Math.min(LZFChunk.MAX_CHUNK_LEN, left);
240         outputPtr = LZFChunk.appendNonCompressed(input, inputPtr, chunkLen, output, outputPtr);
241         left -= chunkLen;
242         if (left < 1) {
243             return outputPtr;
244         }
245         inputPtr += chunkLen;
246         do {
247             chunkLen = Math.min(left, LZFChunk.MAX_CHUNK_LEN);
248             outputPtr = LZFChunk.appendNonCompressed(input, inputPtr, chunkLen, output, outputPtr);
249             inputPtr += chunkLen;
250             left -= chunkLen;
251         } while (left > 0);
252         return outputPtr;
253     }
254 
255     /**
256      * Use lzf uncompressed format to encode a piece of input.
257      */
258     private static int encodeNonCompress(byte[] input, int inputPtr, int length, byte[] output, int outputPtr) {
259         return lzfEncodeNonCompress(input, inputPtr, length, output, outputPtr) - outputPtr;
260     }
261 
262     @Override
263     public Buffer finish(BufferAllocator allocator) {
264         switch (state) {
265             case CLOSED:
266                 throw new CompressionException("Compressor closed");
267             case FINISHED:
268             case PROCESSING:
269                 state = State.FINISHED;
270                 return allocator.allocate(0);
271             default:
272                 throw new IllegalStateException();
273         }
274     }
275 
276     @Override
277     public boolean isFinished() {
278         return state != State.PROCESSING;
279     }
280 
281     @Override
282     public boolean isClosed() {
283         return state == State.CLOSED;
284     }
285 
286     @Override
287     public void close() {
288         if (state != State.CLOSED) {
289             state = State.CLOSED;
290             encoder.close();
291         }
292     }
293 }