1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package io.netty.handler.codec.compression;
18
19 import io.netty.buffer.ByteBuf;
20 import io.netty.buffer.Unpooled;
21 import io.netty.channel.ChannelFuture;
22 import io.netty.channel.ChannelHandlerContext;
23 import io.netty.channel.ChannelPipeline;
24 import io.netty.channel.ChannelPromise;
25 import io.netty.handler.codec.EncoderException;
26 import io.netty.handler.codec.MessageToByteEncoder;
27 import io.netty.util.concurrent.EventExecutor;
28 import io.netty.util.concurrent.PromiseNotifier;
29 import io.netty.util.internal.ObjectUtil;
30 import net.jpountz.lz4.LZ4Compressor;
31 import net.jpountz.lz4.LZ4Exception;
32 import net.jpountz.lz4.LZ4Factory;
33
34 import java.nio.ByteBuffer;
35 import java.util.zip.Checksum;
36
37 import static io.netty.handler.codec.compression.Lz4Constants.BLOCK_TYPE_COMPRESSED;
38 import static io.netty.handler.codec.compression.Lz4Constants.BLOCK_TYPE_NON_COMPRESSED;
39 import static io.netty.handler.codec.compression.Lz4Constants.CHECKSUM_OFFSET;
40 import static io.netty.handler.codec.compression.Lz4Constants.COMPRESSED_LENGTH_OFFSET;
41 import static io.netty.handler.codec.compression.Lz4Constants.COMPRESSION_LEVEL_BASE;
42 import static io.netty.handler.codec.compression.Lz4Constants.DECOMPRESSED_LENGTH_OFFSET;
43 import static io.netty.handler.codec.compression.Lz4Constants.DEFAULT_BLOCK_SIZE;
44 import static io.netty.handler.codec.compression.Lz4Constants.DEFAULT_SEED;
45 import static io.netty.handler.codec.compression.Lz4Constants.HEADER_LENGTH;
46 import static io.netty.handler.codec.compression.Lz4Constants.MAGIC_NUMBER;
47 import static io.netty.handler.codec.compression.Lz4Constants.MAX_BLOCK_SIZE;
48 import static io.netty.handler.codec.compression.Lz4Constants.MIN_BLOCK_SIZE;
49 import static io.netty.handler.codec.compression.Lz4Constants.TOKEN_OFFSET;
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67 public class Lz4FrameEncoder extends MessageToByteEncoder<ByteBuf> {
68 static final int DEFAULT_MAX_ENCODE_SIZE = Integer.MAX_VALUE;
69
70 private final int blockSize;
71
72
73
74
75 private final LZ4Compressor compressor;
76
77
78
79
80 private final ByteBufChecksum checksum;
81
82
83
84
85 private final int compressionLevel;
86
87
88
89
90 private ByteBuf buffer;
91
92
93
94
95 private final int maxEncodeSize;
96
97
98
99
100 private volatile boolean finished;
101
102
103
104
105 private volatile ChannelHandlerContext ctx;
106
107
108
109
110
111
112 public Lz4FrameEncoder() {
113 this(false);
114 }
115
116
117
118
119
120
121
122
123
124 public Lz4FrameEncoder(boolean highCompressor) {
125 this(LZ4Factory.fastestInstance(), highCompressor, DEFAULT_BLOCK_SIZE, new Lz4XXHash32(DEFAULT_SEED));
126 }
127
128
129
130
131
132
133
134
135
136
137
138
139
140 public Lz4FrameEncoder(LZ4Factory factory, boolean highCompressor, int blockSize, Checksum checksum) {
141 this(factory, highCompressor, blockSize, checksum, DEFAULT_MAX_ENCODE_SIZE);
142 }
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157 public Lz4FrameEncoder(LZ4Factory factory, boolean highCompressor, int blockSize,
158 Checksum checksum, int maxEncodeSize) {
159 super(ByteBuf.class);
160 ObjectUtil.checkNotNull(factory, "factory");
161 ObjectUtil.checkNotNull(checksum, "checksum");
162
163 compressor = highCompressor ? factory.highCompressor() : factory.fastCompressor();
164 this.checksum = ByteBufChecksum.wrapChecksum(checksum);
165
166 compressionLevel = compressionLevel(blockSize);
167 this.blockSize = blockSize;
168 this.maxEncodeSize = ObjectUtil.checkPositive(maxEncodeSize, "maxEncodeSize");
169 finished = false;
170 }
171
172
173
174
175 private static int compressionLevel(int blockSize) {
176 if (blockSize < MIN_BLOCK_SIZE || blockSize > MAX_BLOCK_SIZE) {
177 throw new IllegalArgumentException(String.format(
178 "blockSize: %d (expected: %d-%d)", blockSize, MIN_BLOCK_SIZE, MAX_BLOCK_SIZE));
179 }
180 int compressionLevel = 32 - Integer.numberOfLeadingZeros(blockSize - 1);
181 compressionLevel = Math.max(0, compressionLevel - COMPRESSION_LEVEL_BASE);
182 return compressionLevel;
183 }
184
185 @Override
186 protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, ByteBuf msg, boolean preferDirect) {
187 return allocateBuffer(ctx, msg, preferDirect, true);
188 }
189
190 private ByteBuf allocateBuffer(ChannelHandlerContext ctx, ByteBuf msg, boolean preferDirect,
191 boolean allowEmptyReturn) {
192 int targetBufSize = 0;
193 int remaining = msg.readableBytes() + buffer.readableBytes();
194
195
196 if (remaining < 0) {
197 throw new EncoderException("too much data to allocate a buffer for compression");
198 }
199
200 while (remaining > 0) {
201 int curSize = Math.min(blockSize, remaining);
202 remaining -= curSize;
203
204 targetBufSize += compressor.maxCompressedLength(curSize) + HEADER_LENGTH;
205 }
206
207
208
209
210 if (targetBufSize > maxEncodeSize || 0 > targetBufSize) {
211 throw new EncoderException(String.format("requested encode buffer size (%d bytes) exceeds the maximum " +
212 "allowable size (%d bytes)", targetBufSize, maxEncodeSize));
213 }
214
215 if (allowEmptyReturn && targetBufSize < blockSize) {
216 return Unpooled.EMPTY_BUFFER;
217 }
218
219 if (preferDirect) {
220 return ctx.alloc().ioBuffer(targetBufSize, targetBufSize);
221 } else {
222 return ctx.alloc().heapBuffer(targetBufSize, targetBufSize);
223 }
224 }
225
226
227
228
229
230
231
232
233 @Override
234 protected void encode(ChannelHandlerContext ctx, ByteBuf in, ByteBuf out) throws Exception {
235 if (finished) {
236 if (!out.isWritable(in.readableBytes())) {
237
238 throw new IllegalStateException("encode finished and not enough space to write remaining data");
239 }
240 out.writeBytes(in);
241 return;
242 }
243
244 final ByteBuf buffer = this.buffer;
245 int length;
246 while ((length = in.readableBytes()) > 0) {
247 final int nextChunkSize = Math.min(length, buffer.writableBytes());
248 in.readBytes(buffer, nextChunkSize);
249
250 if (!buffer.isWritable()) {
251 flushBufferedData(out);
252 }
253 }
254 }
255
256 private void flushBufferedData(ByteBuf out) {
257 int flushableBytes = buffer.readableBytes();
258 if (flushableBytes == 0) {
259 return;
260 }
261 checksum.reset();
262 checksum.update(buffer, buffer.readerIndex(), flushableBytes);
263 final int check = (int) checksum.getValue();
264
265 final int bufSize = compressor.maxCompressedLength(flushableBytes) + HEADER_LENGTH;
266 out.ensureWritable(bufSize);
267 final int idx = out.writerIndex();
268 int compressedLength;
269 try {
270 ByteBuffer outNioBuffer = out.internalNioBuffer(idx + HEADER_LENGTH, out.writableBytes() - HEADER_LENGTH);
271 int pos = outNioBuffer.position();
272
273 compressor.compress(buffer.internalNioBuffer(buffer.readerIndex(), flushableBytes), outNioBuffer);
274 compressedLength = outNioBuffer.position() - pos;
275 } catch (LZ4Exception e) {
276 throw new CompressionException(e);
277 }
278 final int blockType;
279 if (compressedLength >= flushableBytes) {
280 blockType = BLOCK_TYPE_NON_COMPRESSED;
281 compressedLength = flushableBytes;
282 out.setBytes(idx + HEADER_LENGTH, buffer, buffer.readerIndex(), flushableBytes);
283 } else {
284 blockType = BLOCK_TYPE_COMPRESSED;
285 }
286
287 out.setLong(idx, MAGIC_NUMBER);
288 out.setByte(idx + TOKEN_OFFSET, (byte) (blockType | compressionLevel));
289 out.setIntLE(idx + COMPRESSED_LENGTH_OFFSET, compressedLength);
290 out.setIntLE(idx + DECOMPRESSED_LENGTH_OFFSET, flushableBytes);
291 out.setIntLE(idx + CHECKSUM_OFFSET, check);
292 out.writerIndex(idx + HEADER_LENGTH + compressedLength);
293 buffer.clear();
294 }
295
296 @Override
297 public void flush(final ChannelHandlerContext ctx) throws Exception {
298 if (buffer != null && buffer.isReadable()) {
299 final ByteBuf buf = allocateBuffer(ctx, Unpooled.EMPTY_BUFFER, isPreferDirect(), false);
300 flushBufferedData(buf);
301 ctx.write(buf);
302 }
303 ctx.flush();
304 }
305
306 private ChannelFuture finishEncode(final ChannelHandlerContext ctx, ChannelPromise promise) {
307 if (finished) {
308 promise.setSuccess();
309 return promise;
310 }
311 finished = true;
312
313 final ByteBuf footer = ctx.alloc().heapBuffer(
314 compressor.maxCompressedLength(buffer.readableBytes()) + HEADER_LENGTH);
315 flushBufferedData(footer);
316
317 footer.ensureWritable(HEADER_LENGTH);
318 final int idx = footer.writerIndex();
319 footer.setLong(idx, MAGIC_NUMBER);
320 footer.setByte(idx + TOKEN_OFFSET, (byte) (BLOCK_TYPE_NON_COMPRESSED | compressionLevel));
321 footer.setInt(idx + COMPRESSED_LENGTH_OFFSET, 0);
322 footer.setInt(idx + DECOMPRESSED_LENGTH_OFFSET, 0);
323 footer.setInt(idx + CHECKSUM_OFFSET, 0);
324
325 footer.writerIndex(idx + HEADER_LENGTH);
326
327 return ctx.writeAndFlush(footer, promise);
328 }
329
330
331
332
333 public boolean isClosed() {
334 return finished;
335 }
336
337
338
339
340
341
342 public ChannelFuture close() {
343 return close(ctx().newPromise());
344 }
345
346
347
348
349
350
351 public ChannelFuture close(final ChannelPromise promise) {
352 ChannelHandlerContext ctx = ctx();
353 EventExecutor executor = ctx.executor();
354 if (executor.inEventLoop()) {
355 return finishEncode(ctx, promise);
356 } else {
357 executor.execute(new Runnable() {
358 @Override
359 public void run() {
360 ChannelFuture f = finishEncode(ctx(), promise);
361 PromiseNotifier.cascade(f, promise);
362 }
363 });
364 return promise;
365 }
366 }
367
368 @Override
369 public void close(final ChannelHandlerContext ctx, final ChannelPromise promise) throws Exception {
370 ChannelFuture f = finishEncode(ctx, ctx.newPromise());
371
372 EncoderUtil.closeAfterFinishEncode(ctx, f, promise);
373 }
374
375 private ChannelHandlerContext ctx() {
376 ChannelHandlerContext ctx = this.ctx;
377 if (ctx == null) {
378 throw new IllegalStateException("not added to a pipeline");
379 }
380 return ctx;
381 }
382
383 @Override
384 public void handlerAdded(ChannelHandlerContext ctx) {
385 this.ctx = ctx;
386
387 buffer = Unpooled.wrappedBuffer(new byte[blockSize]);
388 buffer.clear();
389 }
390
391 @Override
392 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
393 super.handlerRemoved(ctx);
394 if (buffer != null) {
395 buffer.release();
396 buffer = null;
397 }
398 }
399
400 final ByteBuf getBackingBuffer() {
401 return buffer;
402 }
403 }