1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty5.handler.codec.compression;
17
18 import io.netty5.buffer.api.Buffer;
19 import io.netty5.buffer.api.BufferAllocator;
20
21 import java.util.function.Supplier;
22 import java.util.zip.Adler32;
23 import java.util.zip.Checksum;
24
25 import static io.netty5.handler.codec.compression.FastLz.BLOCK_TYPE_COMPRESSED;
26 import static io.netty5.handler.codec.compression.FastLz.BLOCK_WITH_CHECKSUM;
27 import static io.netty5.handler.codec.compression.FastLz.MAGIC_NUMBER;
28
29
30
31
32
33
34 public final class FastLzDecompressor implements Decompressor {
35
36
37
38 private enum State {
39 INIT_BLOCK,
40 INIT_BLOCK_PARAMS,
41 DECOMPRESS_DATA,
42 DONE,
43 CORRUPTED,
44 CLOSED
45 }
46
47 private State currentState = State.INIT_BLOCK;
48
49
50
51
52 private final BufferChecksum checksum;
53
54
55
56
57 private int chunkLength;
58
59
60
61
62
63 private int originalLength;
64
65
66
67
68 private boolean isCompressed;
69
70
71
72
73 private boolean hasChecksum;
74
75
76
77
78 private int currentChecksum;
79
80 private FastLzDecompressor(Checksum checksum) {
81 this.checksum = checksum == null ? null : new BufferChecksum(checksum);
82 }
83
84
85
86
87
88
89 public static Supplier<FastLzDecompressor> newFactory() {
90 return newFactory(false);
91 }
92
93
94
95
96
97
98
99
100
101
102
103
104 public static Supplier<FastLzDecompressor> newFactory(boolean validateChecksums) {
105 return newFactory(validateChecksums ? new Adler32() : null);
106 }
107
108
109
110
111
112
113
114
115
116 public static Supplier<FastLzDecompressor> newFactory(Checksum checksum) {
117 return () -> new FastLzDecompressor(checksum);
118 }
119
120 @Override
121 public Buffer decompress(Buffer in, BufferAllocator allocator) throws DecompressionException {
122 switch (currentState) {
123 case CLOSED:
124 throw new DecompressionException("Decompressor closed");
125 case DONE:
126 case CORRUPTED:
127 return allocator.allocate(0);
128 case INIT_BLOCK:
129 if (in.readableBytes() < 4) {
130 return null;
131 }
132
133 final int magic = in.readUnsignedMedium();
134 if (magic != MAGIC_NUMBER) {
135 streamCorrupted("unexpected block identifier");
136 }
137
138 final byte options = in.readByte();
139 isCompressed = (options & 0x01) == BLOCK_TYPE_COMPRESSED;
140 hasChecksum = (options & 0x10) == BLOCK_WITH_CHECKSUM;
141
142 currentState = State.INIT_BLOCK_PARAMS;
143
144 case INIT_BLOCK_PARAMS:
145 if (in.readableBytes() < 2 + (isCompressed ? 2 : 0) + (hasChecksum ? 4 : 0)) {
146 return null;
147 }
148 currentChecksum = hasChecksum ? in.readInt() : 0;
149 chunkLength = in.readUnsignedShort();
150 originalLength = isCompressed ? in.readUnsignedShort() : chunkLength;
151
152 currentState = State.DECOMPRESS_DATA;
153
154 case DECOMPRESS_DATA:
155 final int chunkLength = this.chunkLength;
156 if (in.readableBytes() < chunkLength) {
157 return null;
158 }
159
160 final int idx = in.readerOffset();
161 final int originalLength = this.originalLength;
162
163 Buffer output = null;
164 try {
165 if (isCompressed) {
166 output = allocator.allocate(originalLength);
167 int outputOffset = output.writerOffset();
168 final int decompressedBytes = FastLz.decompress(in, idx, chunkLength,
169 output, outputOffset, originalLength);
170 if (originalLength != decompressedBytes) {
171 streamCorrupted(String.format(
172 "stream corrupted: originalLength(%d) and actual length(%d) mismatch",
173 originalLength, decompressedBytes));
174 }
175 output.skipWritableBytes(decompressedBytes);
176 in.skipReadableBytes(chunkLength);
177 } else {
178 output = in.readSplit(chunkLength);
179 }
180
181 final BufferChecksum checksum = this.checksum;
182 if (hasChecksum && checksum != null) {
183 checksum.reset();
184 checksum.update(output, output.readerOffset(), output.readableBytes());
185 final int checksumResult = (int) checksum.getValue();
186 if (checksumResult != currentChecksum) {
187 streamCorrupted(String.format(
188 "stream corrupted: mismatching checksum: %d (expected: %d)",
189 checksumResult, currentChecksum));
190 }
191 }
192
193 final Buffer data;
194 if (output.readableBytes() > 0) {
195 data = output;
196 output = null;
197 } else {
198 data = null;
199 }
200
201 currentState = State.INIT_BLOCK;
202 return data;
203 } finally {
204 if (output != null) {
205 output.close();
206 }
207 }
208 default:
209 throw new IllegalStateException();
210 }
211 }
212
213 @Override
214 public boolean isFinished() {
215 return currentState == State.DONE || currentState == State.CORRUPTED || currentState == State.CLOSED;
216 }
217
218 @Override
219 public boolean isClosed() {
220 return currentState == State.CLOSED;
221 }
222
223 @Override
224 public void close() {
225 currentState = State.CLOSED;
226 }
227
228 private void streamCorrupted(String message) {
229 currentState = State.CORRUPTED;
230 throw new DecompressionException(message);
231 }
232 }