1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.codec.compression;
17
18 import com.ning.compress.BufferRecycler;
19 import com.ning.compress.lzf.ChunkEncoder;
20 import com.ning.compress.lzf.LZFChunk;
21 import com.ning.compress.lzf.LZFEncoder;
22 import com.ning.compress.lzf.util.ChunkEncoderFactory;
23 import io.netty.buffer.ByteBuf;
24 import io.netty.channel.ChannelHandlerContext;
25 import io.netty.handler.codec.MessageToByteEncoder;
26 import io.netty.util.internal.PlatformDependent;
27
28 import static com.ning.compress.lzf.LZFChunk.MAX_CHUNK_LEN;
29
30
31
32
33
34
35
36 public class LzfEncoder extends MessageToByteEncoder<ByteBuf> {
37
38
39
40
41
42 private static final int MIN_BLOCK_TO_COMPRESS = 16;
43 private static final boolean DEFAULT_SAFE = !PlatformDependent.hasUnsafe();
44
45
46
47
48
49
50
51
52 private final int compressThreshold;
53
54
55
56
57 private final ChunkEncoder encoder;
58
59
60
61
62 private final BufferRecycler recycler;
63
64
65
66
67
68
69
70 public LzfEncoder() {
71 this(DEFAULT_SAFE);
72 }
73
74
75
76
77
78
79
80
81
82
83
84 @Deprecated
85 public LzfEncoder(boolean safeInstance) {
86 this(safeInstance, MAX_CHUNK_LEN);
87 }
88
89
90
91
92
93
94
95
96
97
98
99
100
101 @Deprecated
102 public LzfEncoder(boolean safeInstance, int totalLength) {
103 this(safeInstance, totalLength, MIN_BLOCK_TO_COMPRESS);
104 }
105
106
107
108
109
110
111
112
113
114 public LzfEncoder(int totalLength) {
115 this(DEFAULT_SAFE, totalLength);
116 }
117
118
119
120
121
122
123
124
125
126
127 public LzfEncoder(int totalLength, int compressThreshold) {
128 this(DEFAULT_SAFE, totalLength, compressThreshold);
129 }
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146 @Deprecated
147 public LzfEncoder(boolean safeInstance, int totalLength, int compressThreshold) {
148 super(false);
149 if (totalLength < MIN_BLOCK_TO_COMPRESS || totalLength > MAX_CHUNK_LEN) {
150 throw new IllegalArgumentException("totalLength: " + totalLength +
151 " (expected: " + MIN_BLOCK_TO_COMPRESS + '-' + MAX_CHUNK_LEN + ')');
152 }
153
154 if (compressThreshold < MIN_BLOCK_TO_COMPRESS) {
155
156 throw new IllegalArgumentException("compressThreshold:" + compressThreshold +
157 " expected >=" + MIN_BLOCK_TO_COMPRESS);
158 }
159 this.compressThreshold = compressThreshold;
160
161 this.encoder = safeInstance ?
162 ChunkEncoderFactory.safeNonAllocatingInstance(totalLength)
163 : ChunkEncoderFactory.optimalNonAllocatingInstance(totalLength);
164
165 this.recycler = BufferRecycler.instance();
166 }
167
168 @Override
169 protected void encode(ChannelHandlerContext ctx, ByteBuf in, ByteBuf out) throws Exception {
170 final int length = in.readableBytes();
171 final int idx = in.readerIndex();
172 final byte[] input;
173 final int inputPtr;
174 if (in.hasArray()) {
175 input = in.array();
176 inputPtr = in.arrayOffset() + idx;
177 } else {
178 input = recycler.allocInputBuffer(length);
179 in.getBytes(idx, input, 0, length);
180 inputPtr = 0;
181 }
182
183
184 final int maxOutputLength = LZFEncoder.estimateMaxWorkspaceSize(length) + 1;
185 out.ensureWritable(maxOutputLength);
186 final byte[] output;
187 final int outputPtr;
188 if (out.hasArray()) {
189 output = out.array();
190 outputPtr = out.arrayOffset() + out.writerIndex();
191 } else {
192 output = new byte[maxOutputLength];
193 outputPtr = 0;
194 }
195
196 final int outputLength;
197 if (length >= compressThreshold) {
198
199 outputLength = encodeCompress(input, inputPtr, length, output, outputPtr);
200 } else {
201
202 outputLength = encodeNonCompress(input, inputPtr, length, output, outputPtr);
203 }
204
205 if (out.hasArray()) {
206 out.writerIndex(out.writerIndex() + outputLength);
207 } else {
208 out.writeBytes(output, 0, outputLength);
209 }
210
211 in.skipBytes(length);
212
213 if (!in.hasArray()) {
214 recycler.releaseInputBuffer(input);
215 }
216 }
217
218 private int encodeCompress(byte[] input, int inputPtr, int length, byte[] output, int outputPtr) {
219 return LZFEncoder.appendEncoded(encoder,
220 input, inputPtr, length, output, outputPtr) - outputPtr;
221 }
222
223 private static int lzfEncodeNonCompress(byte[] input, int inputPtr, int length, byte[] output, int outputPtr) {
224 int left = length;
225 int chunkLen = Math.min(LZFChunk.MAX_CHUNK_LEN, left);
226 outputPtr = LZFChunk.appendNonCompressed(input, inputPtr, chunkLen, output, outputPtr);
227 left -= chunkLen;
228 if (left < 1) {
229 return outputPtr;
230 }
231 inputPtr += chunkLen;
232 do {
233 chunkLen = Math.min(left, LZFChunk.MAX_CHUNK_LEN);
234 outputPtr = LZFChunk.appendNonCompressed(input, inputPtr, chunkLen, output, outputPtr);
235 inputPtr += chunkLen;
236 left -= chunkLen;
237 } while (left > 0);
238 return outputPtr;
239 }
240
241
242
243
244 private static int encodeNonCompress(byte[] input, int inputPtr, int length, byte[] output, int outputPtr) {
245 return lzfEncodeNonCompress(input, inputPtr, length, output, outputPtr) - outputPtr;
246 }
247
248 @Override
249 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
250 encoder.close();
251 super.handlerRemoved(ctx);
252 }
253 }