1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package io.netty5.handler.codec.compression;
18
19 import io.netty5.buffer.api.Buffer;
20 import io.netty5.buffer.api.BufferAllocator;
21 import io.netty5.handler.codec.EncoderException;
22 import io.netty5.util.internal.ObjectUtil;
23 import net.jpountz.lz4.LZ4Compressor;
24 import net.jpountz.lz4.LZ4Exception;
25 import net.jpountz.lz4.LZ4Factory;
26
27 import java.nio.ByteBuffer;
28 import java.util.function.Supplier;
29 import java.util.zip.Checksum;
30
31 import static io.netty5.handler.codec.compression.Lz4Constants.BLOCK_TYPE_COMPRESSED;
32 import static io.netty5.handler.codec.compression.Lz4Constants.BLOCK_TYPE_NON_COMPRESSED;
33 import static io.netty5.handler.codec.compression.Lz4Constants.CHECKSUM_OFFSET;
34 import static io.netty5.handler.codec.compression.Lz4Constants.COMPRESSED_LENGTH_OFFSET;
35 import static io.netty5.handler.codec.compression.Lz4Constants.COMPRESSION_LEVEL_BASE;
36 import static io.netty5.handler.codec.compression.Lz4Constants.DECOMPRESSED_LENGTH_OFFSET;
37 import static io.netty5.handler.codec.compression.Lz4Constants.DEFAULT_BLOCK_SIZE;
38 import static io.netty5.handler.codec.compression.Lz4Constants.DEFAULT_SEED;
39 import static io.netty5.handler.codec.compression.Lz4Constants.HEADER_LENGTH;
40 import static io.netty5.handler.codec.compression.Lz4Constants.MAGIC_NUMBER;
41 import static io.netty5.handler.codec.compression.Lz4Constants.MAX_BLOCK_SIZE;
42 import static io.netty5.handler.codec.compression.Lz4Constants.MIN_BLOCK_SIZE;
43 import static io.netty5.handler.codec.compression.Lz4Constants.TOKEN_OFFSET;
44 import static java.util.Objects.requireNonNull;
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62 public final class Lz4Compressor implements Compressor {
63 static final int DEFAULT_MAX_ENCODE_SIZE = Integer.MAX_VALUE;
64
65 private final int blockSize;
66
67
68
69
70 private final LZ4Compressor compressor;
71
72
73
74
75 private final BufferChecksum checksum;
76
77
78
79
80 private final int compressionLevel;
81
82
83
84
85 private final int maxEncodeSize;
86
87 private enum State {
88 PROCESSING,
89 FINISHED,
90 CLOSED
91 }
92
93 private State state = State.PROCESSING;
94
95
96
97
98
99
100 public static Supplier<Lz4Compressor> newFactory() {
101 return newFactory(false);
102 }
103
104
105
106
107
108
109
110
111
112
113 public static Supplier<Lz4Compressor> newFactory(boolean highCompressor) {
114 return newFactory(LZ4Factory.fastestInstance(), highCompressor,
115 DEFAULT_BLOCK_SIZE, new Lz4XXHash32(DEFAULT_SEED));
116 }
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131 public static Supplier<Lz4Compressor> newFactory(LZ4Factory factory, boolean highCompressor,
132 int blockSize, Checksum checksum) {
133 return newFactory(factory, highCompressor, blockSize, checksum, DEFAULT_MAX_ENCODE_SIZE);
134 }
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150 public static Supplier<Lz4Compressor> newFactory(LZ4Factory factory, boolean highCompressor, int blockSize,
151 Checksum checksum, int maxEncodeSize) {
152 requireNonNull(factory, "factory");
153 requireNonNull(checksum, "checksum");
154 ObjectUtil.checkPositive(blockSize, "blockSize");
155 ObjectUtil.checkPositive(maxEncodeSize, "maxEncodeSize");
156 return () -> new Lz4Compressor(factory, highCompressor, blockSize, checksum, maxEncodeSize);
157 }
158
159 private Lz4Compressor(LZ4Factory factory, boolean highCompressor, int blockSize,
160 Checksum checksum, int maxEncodeSize) {
161 compressor = highCompressor ? factory.highCompressor() : factory.fastCompressor();
162 this.checksum = checksum == null ? null : checksum instanceof Lz4XXHash32 ? (Lz4XXHash32) checksum :
163 new BufferChecksum(checksum);
164
165 compressionLevel = compressionLevel(blockSize);
166 this.blockSize = blockSize;
167 this.maxEncodeSize = maxEncodeSize;
168 }
169
170
171
172
173 private static int compressionLevel(int blockSize) {
174 if (blockSize < MIN_BLOCK_SIZE || blockSize > MAX_BLOCK_SIZE) {
175 throw new IllegalArgumentException(String.format(
176 "blockSize: %d (expected: %d-%d)", blockSize, MIN_BLOCK_SIZE, MAX_BLOCK_SIZE));
177 }
178 int compressionLevel = 32 - Integer.numberOfLeadingZeros(blockSize - 1);
179 compressionLevel = Math.max(0, compressionLevel - COMPRESSION_LEVEL_BASE);
180 return compressionLevel;
181 }
182
183 private Buffer allocateBuffer(BufferAllocator allocator, Buffer msg) {
184 int targetBufSize = 0;
185 int remaining = msg.readableBytes();
186
187
188 if (remaining < 0) {
189 throw new EncoderException("too much data to allocate a buffer for compression");
190 }
191
192 while (remaining > 0) {
193 int curSize = Math.min(blockSize, remaining);
194 remaining -= curSize;
195
196 targetBufSize += compressor.maxCompressedLength(curSize) + HEADER_LENGTH;
197 }
198
199
200
201
202 if (targetBufSize > maxEncodeSize || 0 > targetBufSize) {
203 throw new EncoderException(String.format("requested encode buffer size (%d bytes) exceeds the maximum " +
204 "allowable size (%d bytes)", targetBufSize, maxEncodeSize));
205 }
206
207 return allocator.allocate(targetBufSize);
208 }
209
210 @Override
211 public Buffer compress(Buffer input, BufferAllocator allocator) throws CompressionException {
212 switch (state) {
213 case CLOSED:
214 throw new CompressionException("Compressor closed");
215 case FINISHED:
216 return allocator.allocate(0);
217 case PROCESSING:
218 if (input.readableBytes() == 0) {
219 return allocator.allocate(0);
220 }
221
222 Buffer out = allocateBuffer(allocator, input);
223 try {
224
225
226 while (input.readableBytes() > 0) {
227 compressData(input, out);
228 }
229 } catch (Throwable cause) {
230 out.close();
231 throw cause;
232 }
233 return out;
234 default:
235 throw new IllegalStateException();
236 }
237 }
238
239 @Override
240 public Buffer finish(BufferAllocator allocator) {
241 switch (state) {
242 case CLOSED:
243 throw new CompressionException("Compressor closed");
244 case FINISHED:
245 case PROCESSING:
246 state = State.FINISHED;
247
248 final Buffer footer = allocator.allocate(HEADER_LENGTH);
249 footer.ensureWritable(HEADER_LENGTH);
250 final int idx = footer.writerOffset();
251 footer.setLong(idx, MAGIC_NUMBER);
252 footer.setByte(idx + TOKEN_OFFSET, (byte) (BLOCK_TYPE_NON_COMPRESSED | compressionLevel));
253 footer.setInt(idx + COMPRESSED_LENGTH_OFFSET, 0);
254 footer.setInt(idx + DECOMPRESSED_LENGTH_OFFSET, 0);
255 footer.setInt(idx + CHECKSUM_OFFSET, 0);
256
257 footer.skipWritableBytes(HEADER_LENGTH);
258 return footer;
259 default:
260 throw new IllegalStateException();
261 }
262 }
263
264 @Override
265 public boolean isFinished() {
266 return state != State.PROCESSING;
267 }
268
269 @Override
270 public boolean isClosed() {
271 return state == State.CLOSED;
272 }
273
274 @Override
275 public void close() {
276 state = State.CLOSED;
277 }
278
279
280
281
282
283 private void compressData(Buffer in, Buffer out) {
284 int inReaderIndex = in.readerOffset();
285 int flushableBytes = Math.min(in.readableBytes(), blockSize);
286 assert flushableBytes > 0;
287 checksum.reset();
288 checksum.update(in, inReaderIndex, flushableBytes);
289 final int check = (int) checksum.getValue();
290
291 final int bufSize = compressor.maxCompressedLength(flushableBytes) + HEADER_LENGTH;
292 out.ensureWritable(bufSize);
293 final int idx = out.writerOffset();
294 int compressedLength = 0;
295 try {
296 out.skipWritableBytes(HEADER_LENGTH);
297 assert out.countWritableComponents() == 1;
298 int readable = flushableBytes;
299
300 try (var writableIteration = out.forEachWritable()) {
301 var writableComponent = writableIteration.first();
302 try (var readableIteration = in.forEachReadable()) {
303 for (var readableComponent = readableIteration.first();
304 readableComponent != null; readableComponent = readableComponent.next()) {
305 ByteBuffer outNioBuffer = writableComponent.writableBuffer();
306 int pos = outNioBuffer.position();
307 ByteBuffer inNioBuffer = readableComponent.readableBuffer();
308 if (inNioBuffer.remaining() > readable) {
309 inNioBuffer.limit(inNioBuffer.position() + readable);
310 compressor.compress(inNioBuffer, outNioBuffer);
311 compressedLength += outNioBuffer.position() - pos;
312 break;
313 } else {
314 readable -= inNioBuffer.remaining();
315 compressor.compress(inNioBuffer, outNioBuffer);
316 compressedLength += outNioBuffer.position() - pos;
317 }
318 }
319 }
320 }
321 } catch (LZ4Exception e) {
322 throw new CompressionException(e);
323 } finally {
324 out.writerOffset(idx);
325 }
326 final int blockType;
327 if (compressedLength >= flushableBytes) {
328 blockType = BLOCK_TYPE_NON_COMPRESSED;
329 compressedLength = flushableBytes;
330 in.copyInto(inReaderIndex, out, idx + HEADER_LENGTH, flushableBytes);
331 } else {
332 blockType = BLOCK_TYPE_COMPRESSED;
333 }
334
335 out.setLong(idx, MAGIC_NUMBER);
336 out.setByte(idx + TOKEN_OFFSET, (byte) (blockType | compressionLevel));
337 out.setInt(idx + COMPRESSED_LENGTH_OFFSET, Integer.reverseBytes(compressedLength));
338 out.setInt(idx + DECOMPRESSED_LENGTH_OFFSET, Integer.reverseBytes(flushableBytes));
339 out.setInt(idx + CHECKSUM_OFFSET, Integer.reverseBytes(check));
340 out.writerOffset(idx + HEADER_LENGTH + compressedLength);
341
342 in.readerOffset(inReaderIndex + flushableBytes);
343 }
344 }