1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty5.handler.codec.compression;
17
18 import io.netty5.buffer.api.Buffer;
19 import io.netty5.buffer.api.BufferAllocator;
20
21 import java.util.function.Supplier;
22
23 import static io.netty5.handler.codec.compression.Bzip2Constants.BASE_BLOCK_SIZE;
24 import static io.netty5.handler.codec.compression.Bzip2Constants.END_OF_STREAM_MAGIC_1;
25 import static io.netty5.handler.codec.compression.Bzip2Constants.END_OF_STREAM_MAGIC_2;
26 import static io.netty5.handler.codec.compression.Bzip2Constants.MAGIC_NUMBER;
27 import static io.netty5.handler.codec.compression.Bzip2Constants.MAX_BLOCK_SIZE;
28 import static io.netty5.handler.codec.compression.Bzip2Constants.MIN_BLOCK_SIZE;
29
30
31
32
33
34
35 public final class Bzip2Compressor implements Compressor {
36
37
38
39
40
41
42
43
44 private Bzip2Compressor(final int blockSizeMultiplier) {
45 streamBlockSize = blockSizeMultiplier * BASE_BLOCK_SIZE;
46 }
47
48
49
50
51
52
53 public static Supplier<Bzip2Compressor> newFactory() {
54 return newFactory(MAX_BLOCK_SIZE);
55 }
56
57
58
59
60
61
62
63
64
65
66 public static Supplier<Bzip2Compressor> newFactory(final int blockSizeMultiplier) {
67 if (blockSizeMultiplier < MIN_BLOCK_SIZE || blockSizeMultiplier > MAX_BLOCK_SIZE) {
68 throw new IllegalArgumentException(
69 "blockSizeMultiplier: " + blockSizeMultiplier + " (expected: 1-9)");
70 }
71 return () -> new Bzip2Compressor(blockSizeMultiplier);
72 }
73
74
75
76
77 private enum State {
78 INIT,
79 INIT_BLOCK,
80 WRITE_DATA,
81 CLOSE_BLOCK
82 }
83
84 private State currentState = State.INIT;
85
86
87
88
89 private final Bzip2BitWriter writer = new Bzip2BitWriter();
90
91
92
93
94 private final int streamBlockSize;
95
96
97
98
99 private int streamCRC;
100
101
102
103
104 private Bzip2BlockCompressor blockCompressor;
105
106 private enum CompressorState {
107 PROCESSING,
108 FINISHED,
109 CLOSED
110 }
111
112 private CompressorState compressorState = CompressorState.PROCESSING;
113
114 @Override
115 public Buffer compress(Buffer in, BufferAllocator allocator) throws CompressionException {
116 switch (compressorState) {
117 case CLOSED:
118 throw new CompressionException("Compressor closed");
119 case FINISHED:
120 return allocator.allocate(0);
121 case PROCESSING:
122 return compressData(in, allocator);
123 default:
124 throw new IllegalStateException();
125 }
126 }
127
128 private Buffer compressData(Buffer in, BufferAllocator allocator) {
129 Buffer out = allocator.allocate(256);
130 for (;;) {
131 switch (currentState) {
132 case INIT:
133 out.ensureWritable(4);
134 out.writeMedium(MAGIC_NUMBER);
135 out.writeByte((byte) ('0' + streamBlockSize / BASE_BLOCK_SIZE));
136 currentState = State.INIT_BLOCK;
137
138 case INIT_BLOCK:
139 blockCompressor = new Bzip2BlockCompressor(writer, streamBlockSize);
140 currentState = State.WRITE_DATA;
141
142 case WRITE_DATA:
143 if (in.readableBytes() == 0) {
144 return out;
145 }
146 Bzip2BlockCompressor blockCompressor = this.blockCompressor;
147 final int length = Math.min(in.readableBytes(), blockCompressor.availableSize());
148 final int bytesWritten = blockCompressor.write(in, in.readerOffset(), length);
149 in.skipReadableBytes(bytesWritten);
150 if (!blockCompressor.isFull()) {
151 if (in.readableBytes() > 0) {
152 break;
153 } else {
154 return out;
155 }
156 }
157 currentState = State.CLOSE_BLOCK;
158
159 case CLOSE_BLOCK:
160 closeBlock(out);
161 currentState = State.INIT_BLOCK;
162 break;
163 default:
164 throw new IllegalStateException();
165 }
166 }
167 }
168
169
170
171
172 private void closeBlock(Buffer out) {
173 final Bzip2BlockCompressor blockCompressor = this.blockCompressor;
174 if (!blockCompressor.isEmpty()) {
175 blockCompressor.close(out);
176 final int blockCRC = blockCompressor.crc();
177 streamCRC = (streamCRC << 1 | streamCRC >>> 31) ^ blockCRC;
178 }
179 }
180
181 @Override
182 public Buffer finish(BufferAllocator allocator) {
183 switch (compressorState) {
184 case CLOSED:
185 throw new CompressionException("Compressor closed");
186 case FINISHED:
187 return allocator.allocate(0);
188 case PROCESSING:
189 compressorState = CompressorState.FINISHED;
190 final Buffer footer = allocator.allocate(256);
191 try {
192 closeBlock(footer);
193
194 final int streamCRC = this.streamCRC;
195 final Bzip2BitWriter writer = this.writer;
196 try {
197 writer.writeBits(footer, 24, END_OF_STREAM_MAGIC_1);
198 writer.writeBits(footer, 24, END_OF_STREAM_MAGIC_2);
199 writer.writeInt(footer, streamCRC);
200 writer.flush(footer);
201 } finally {
202 blockCompressor = null;
203 }
204 return footer;
205 } catch (Throwable cause) {
206 footer.close();
207 throw cause;
208 }
209 default:
210 throw new IllegalStateException();
211 }
212 }
213
214 @Override
215 public boolean isFinished() {
216 return compressorState != CompressorState.PROCESSING;
217 }
218
219 @Override
220 public boolean isClosed() {
221 return compressorState == CompressorState.CLOSED;
222 }
223
224 @Override
225 public void close() {
226 compressorState = CompressorState.CLOSED;
227 }
228 }