1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty5.handler.codec.compression;
17
18 import com.ning.compress.BufferRecycler;
19 import com.ning.compress.lzf.ChunkEncoder;
20 import com.ning.compress.lzf.LZFChunk;
21 import com.ning.compress.lzf.LZFEncoder;
22 import com.ning.compress.lzf.util.ChunkEncoderFactory;
23 import io.netty5.buffer.api.Buffer;
24 import io.netty5.buffer.api.BufferAllocator;
25
26 import java.util.function.Supplier;
27
28 import static com.ning.compress.lzf.LZFChunk.MAX_CHUNK_LEN;
29
30
31
32
33
34
35
36 public final class LzfCompressor implements Compressor {
37
38
39
40
41
42 private static final int MIN_BLOCK_TO_COMPRESS = 16;
43
44
45
46
47
48
49
50
51 private final int compressThreshold;
52
53
54
55
56 private final ChunkEncoder encoder;
57
58
59
60
61 private final BufferRecycler recycler;
62
63 private enum State {
64 PROCESSING,
65 FINISHED,
66 CLOSED
67 }
68
69 private State state = State.PROCESSING;
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85 private LzfCompressor(boolean safeInstance, int totalLength, int compressThreshold) {
86 this.compressThreshold = compressThreshold;
87
88 this.encoder = safeInstance ?
89 ChunkEncoderFactory.safeNonAllocatingInstance(totalLength)
90 : ChunkEncoderFactory.optimalNonAllocatingInstance(totalLength);
91
92 this.recycler = BufferRecycler.instance();
93 }
94
95
96
97
98
99
100
101
102 public static Supplier<LzfCompressor> newFactory() {
103 return newFactory(false);
104 }
105
106
107
108
109
110
111
112
113
114
115
116 public static Supplier<LzfCompressor> newFactory(boolean safeInstance) {
117 return newFactory(safeInstance, MAX_CHUNK_LEN);
118 }
119
120
121
122
123
124
125
126
127
128
129
130
131
132 public static Supplier<LzfCompressor> newFactory(boolean safeInstance, int totalLength) {
133 return newFactory(safeInstance, totalLength, LzfCompressor.MIN_BLOCK_TO_COMPRESS);
134 }
135
136
137
138
139
140
141
142
143
144
145 public static Supplier<LzfCompressor> newFactory(int totalLength) {
146 return newFactory(false, totalLength);
147 }
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164 public static Supplier<LzfCompressor> newFactory(boolean safeInstance, int totalLength, int compressThreshold) {
165 if (totalLength < MIN_BLOCK_TO_COMPRESS || totalLength > MAX_CHUNK_LEN) {
166 throw new IllegalArgumentException("totalLength: " + totalLength +
167 " (expected: " + MIN_BLOCK_TO_COMPRESS + '-' + MAX_CHUNK_LEN + ')');
168 }
169
170 if (compressThreshold < MIN_BLOCK_TO_COMPRESS) {
171
172 throw new IllegalArgumentException("compressThreshold:" + compressThreshold +
173 " expected >=" + MIN_BLOCK_TO_COMPRESS);
174 }
175 return () -> new LzfCompressor(safeInstance, totalLength, compressThreshold);
176 }
177
178 @Override
179 public Buffer compress(Buffer in, BufferAllocator allocator) throws CompressionException {
180 switch (state) {
181 case CLOSED:
182 throw new CompressionException("Compressor closed");
183 case FINISHED:
184 return allocator.allocate(0);
185 case PROCESSING:
186 if (in.readableBytes() == 0) {
187 return allocator.allocate(0);
188 }
189 try (var readableIteration = in.forEachReadable()) {
190 var readableComponent = readableIteration.first();
191 final int length = readableComponent.readableBytes();
192 final byte[] input;
193 final int inputPtr;
194
195 if (readableComponent.hasReadableArray()) {
196 input = readableComponent.readableArray();
197 inputPtr = readableComponent.readableArrayOffset();
198 } else {
199 input = recycler.allocInputBuffer(length);
200 readableComponent.readableBuffer().get(input, 0, length);
201 inputPtr = 0;
202 }
203
204
205 final byte[] output = recycler.allocOutputBuffer(LZFEncoder.estimateMaxWorkspaceSize(length) + 1);
206 try {
207 final int outputLength;
208 if (length >= compressThreshold) {
209
210 outputLength = encodeCompress(input, inputPtr, length, output, 0);
211 } else {
212
213 outputLength = encodeNonCompress(input, inputPtr, length, output, 0);
214 }
215
216 readableComponent.skipReadableBytes(length);
217
218 if (!readableComponent.hasReadableArray()) {
219 recycler.releaseInputBuffer(input);
220 }
221 return allocator.allocate(outputLength).writeBytes(output, 0, outputLength);
222 } finally {
223 recycler.releaseOutputBuffer(output);
224 }
225 }
226
227 default:
228 throw new IllegalStateException();
229 }
230 }
231
232 private int encodeCompress(byte[] input, int inputPtr, int length, byte[] output, int outputPtr) {
233 return LZFEncoder.appendEncoded(encoder,
234 input, inputPtr, length, output, outputPtr) - outputPtr;
235 }
236
237 private static int lzfEncodeNonCompress(byte[] input, int inputPtr, int length, byte[] output, int outputPtr) {
238 int left = length;
239 int chunkLen = Math.min(LZFChunk.MAX_CHUNK_LEN, left);
240 outputPtr = LZFChunk.appendNonCompressed(input, inputPtr, chunkLen, output, outputPtr);
241 left -= chunkLen;
242 if (left < 1) {
243 return outputPtr;
244 }
245 inputPtr += chunkLen;
246 do {
247 chunkLen = Math.min(left, LZFChunk.MAX_CHUNK_LEN);
248 outputPtr = LZFChunk.appendNonCompressed(input, inputPtr, chunkLen, output, outputPtr);
249 inputPtr += chunkLen;
250 left -= chunkLen;
251 } while (left > 0);
252 return outputPtr;
253 }
254
255
256
257
258 private static int encodeNonCompress(byte[] input, int inputPtr, int length, byte[] output, int outputPtr) {
259 return lzfEncodeNonCompress(input, inputPtr, length, output, outputPtr) - outputPtr;
260 }
261
262 @Override
263 public Buffer finish(BufferAllocator allocator) {
264 switch (state) {
265 case CLOSED:
266 throw new CompressionException("Compressor closed");
267 case FINISHED:
268 case PROCESSING:
269 state = State.FINISHED;
270 return allocator.allocate(0);
271 default:
272 throw new IllegalStateException();
273 }
274 }
275
276 @Override
277 public boolean isFinished() {
278 return state != State.PROCESSING;
279 }
280
281 @Override
282 public boolean isClosed() {
283 return state == State.CLOSED;
284 }
285
286 @Override
287 public void close() {
288 if (state != State.CLOSED) {
289 state = State.CLOSED;
290 encoder.close();
291 }
292 }
293 }