1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty5.handler.codec.compression;
17
18 import io.netty5.buffer.BufferUtil;
19 import io.netty5.buffer.api.Buffer;
20 import io.netty5.buffer.api.BufferAllocator;
21
22 import java.util.function.Supplier;
23
24 import static io.netty5.handler.codec.compression.Snappy.calculateChecksum;
25
26
27
28
29
30
31 public final class SnappyCompressor implements Compressor {
32 private enum State {
33 Init,
34 Started,
35 Finished,
36 Closed
37 }
38
39
40
41
42
43
44 private static final int MIN_COMPRESSIBLE_LENGTH = 18;
45
46
47
48
49
50 private static final byte[] STREAM_START = {
51 (byte) 0xff, 0x06, 0x00, 0x00, 0x73, 0x4e, 0x61, 0x50, 0x70, 0x59
52 };
53
54 private final Snappy snappy = new Snappy();
55 private State state = State.Init;
56
57 private SnappyCompressor() { }
58
59
60
61
62
63
64 public static Supplier<SnappyCompressor> newFactory() {
65 return SnappyCompressor::new;
66 }
67
68 @Override
69 public Buffer compress(Buffer in, BufferAllocator allocator) throws CompressionException {
70 switch (state) {
71 case Finished:
72 return allocator.allocate(0);
73 case Closed:
74 throw new CompressionException("Compressor closed");
75 default:
76
77 Buffer out = allocator.allocate(256);
78 try {
79 if (state == State.Init) {
80 state = State.Started;
81 out.writeBytes(STREAM_START);
82 } else if (state != State.Started) {
83 throw new IllegalStateException();
84 }
85
86 int dataLength = in.readableBytes();
87 if (dataLength > MIN_COMPRESSIBLE_LENGTH) {
88 for (;;) {
89 final int lengthIdx = out.writerOffset() + 1;
90 if (dataLength < MIN_COMPRESSIBLE_LENGTH) {
91 try (Buffer slice = in.readSplit(dataLength)) {
92 writeUnencodedChunk(slice, out, dataLength);
93 break;
94 }
95 }
96
97 out.writeInt(0);
98 if (dataLength > Short.MAX_VALUE) {
99 try (Buffer slice = in.readSplit(Short.MAX_VALUE)) {
100 calculateAndWriteChecksum(slice, out);
101 snappy.encode(slice, out, Short.MAX_VALUE);
102 setChunkLength(out, lengthIdx);
103 dataLength -= Short.MAX_VALUE;
104 }
105 } else {
106 try (Buffer slice = in.readSplit(dataLength)) {
107 calculateAndWriteChecksum(slice, out);
108 snappy.encode(slice, out, dataLength);
109 setChunkLength(out, lengthIdx);
110 break;
111 }
112 }
113 }
114 } else {
115 writeUnencodedChunk(in, out, dataLength);
116 }
117 return out;
118 } catch (Throwable cause) {
119 out.close();
120 throw cause;
121 }
122 }
123 }
124
125 @Override
126 public Buffer finish(BufferAllocator allocator) {
127 switch (state) {
128 case Closed:
129 throw new CompressionException("Compressor closed");
130 case Finished:
131 case Init:
132 case Started:
133 state = State.Finished;
134 return allocator.allocate(0);
135 default:
136 throw new IllegalStateException();
137 }
138 }
139
140 @Override
141 public boolean isFinished() {
142 switch (state) {
143 case Finished:
144 case Closed:
145 return true;
146 default:
147 return false;
148 }
149 }
150
151 @Override
152 public boolean isClosed() {
153 return state == State.Closed;
154 }
155
156 @Override
157 public void close() {
158 state = State.Closed;
159 }
160
161 private static void writeUnencodedChunk(Buffer in, Buffer out, int dataLength) {
162 out.writeByte((byte) 1);
163 writeChunkLength(out, dataLength + 4);
164 calculateAndWriteChecksum(in, out);
165 in.copyInto(in.readerOffset(), out, out.writerOffset(), dataLength);
166 in.skipReadableBytes(dataLength);
167 out.skipWritableBytes(dataLength);
168 }
169
170 private static void setChunkLength(Buffer out, int lengthIdx) {
171 int chunkLength = out.writerOffset() - lengthIdx - 3;
172 if (chunkLength >>> 24 != 0) {
173 throw new CompressionException("compressed data too large: " + chunkLength);
174 }
175 out.setMedium(lengthIdx, BufferUtil.reverseMedium(chunkLength));
176 }
177
178
179
180
181
182
183
184 private static void writeChunkLength(Buffer out, int chunkLength) {
185 out.writeMedium(BufferUtil.reverseMedium(chunkLength));
186 }
187
188
189
190
191
192
193
194 private static void calculateAndWriteChecksum(Buffer slice, Buffer out) {
195 out.writeInt(Integer.reverseBytes(calculateChecksum(slice)));
196 }
197 }