1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package io.netty.handler.codec.compression;
18
19 import io.netty.buffer.ByteBuf;
20 import io.netty.buffer.Unpooled;
21 import io.netty.channel.ChannelFuture;
22 import io.netty.channel.ChannelFutureListener;
23 import io.netty.channel.ChannelHandlerContext;
24 import io.netty.channel.ChannelPipeline;
25 import io.netty.channel.ChannelPromise;
26 import io.netty.handler.codec.EncoderException;
27 import io.netty.handler.codec.MessageToByteEncoder;
28 import io.netty.util.concurrent.EventExecutor;
29 import io.netty.util.concurrent.PromiseNotifier;
30 import io.netty.util.internal.ObjectUtil;
31 import net.jpountz.lz4.LZ4Compressor;
32 import net.jpountz.lz4.LZ4Exception;
33 import net.jpountz.lz4.LZ4Factory;
34
35 import java.nio.ByteBuffer;
36 import java.util.concurrent.TimeUnit;
37 import java.util.zip.Checksum;
38
39 import static io.netty.handler.codec.compression.Lz4Constants.BLOCK_TYPE_COMPRESSED;
40 import static io.netty.handler.codec.compression.Lz4Constants.BLOCK_TYPE_NON_COMPRESSED;
41 import static io.netty.handler.codec.compression.Lz4Constants.CHECKSUM_OFFSET;
42 import static io.netty.handler.codec.compression.Lz4Constants.COMPRESSED_LENGTH_OFFSET;
43 import static io.netty.handler.codec.compression.Lz4Constants.COMPRESSION_LEVEL_BASE;
44 import static io.netty.handler.codec.compression.Lz4Constants.DECOMPRESSED_LENGTH_OFFSET;
45 import static io.netty.handler.codec.compression.Lz4Constants.DEFAULT_BLOCK_SIZE;
46 import static io.netty.handler.codec.compression.Lz4Constants.DEFAULT_SEED;
47 import static io.netty.handler.codec.compression.Lz4Constants.HEADER_LENGTH;
48 import static io.netty.handler.codec.compression.Lz4Constants.MAGIC_NUMBER;
49 import static io.netty.handler.codec.compression.Lz4Constants.MAX_BLOCK_SIZE;
50 import static io.netty.handler.codec.compression.Lz4Constants.MIN_BLOCK_SIZE;
51 import static io.netty.handler.codec.compression.Lz4Constants.TOKEN_OFFSET;
52 import static io.netty.handler.codec.compression.Lz4Constants.THREAD_POOL_DELAY_SECONDS;
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70 public class Lz4FrameEncoder extends MessageToByteEncoder<ByteBuf> {
71 static final int DEFAULT_MAX_ENCODE_SIZE = Integer.MAX_VALUE;
72
73 private final int blockSize;
74
75
76
77
78 private final LZ4Compressor compressor;
79
80
81
82
83 private final ByteBufChecksum checksum;
84
85
86
87
88 private final int compressionLevel;
89
90
91
92
93 private ByteBuf buffer;
94
95
96
97
98 private final int maxEncodeSize;
99
100
101
102
103 private volatile boolean finished;
104
105
106
107
108 private volatile ChannelHandlerContext ctx;
109
110
111
112
113
114
115 public Lz4FrameEncoder() {
116 this(false);
117 }
118
119
120
121
122
123
124
125
126
127 public Lz4FrameEncoder(boolean highCompressor) {
128 this(LZ4Factory.fastestInstance(), highCompressor, DEFAULT_BLOCK_SIZE, new Lz4XXHash32(DEFAULT_SEED));
129 }
130
131
132
133
134
135
136
137
138
139
140
141
142
143 public Lz4FrameEncoder(LZ4Factory factory, boolean highCompressor, int blockSize, Checksum checksum) {
144 this(factory, highCompressor, blockSize, checksum, DEFAULT_MAX_ENCODE_SIZE);
145 }
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160 public Lz4FrameEncoder(LZ4Factory factory, boolean highCompressor, int blockSize,
161 Checksum checksum, int maxEncodeSize) {
162 ObjectUtil.checkNotNull(factory, "factory");
163 ObjectUtil.checkNotNull(checksum, "checksum");
164
165 compressor = highCompressor ? factory.highCompressor() : factory.fastCompressor();
166 this.checksum = ByteBufChecksum.wrapChecksum(checksum);
167
168 compressionLevel = compressionLevel(blockSize);
169 this.blockSize = blockSize;
170 this.maxEncodeSize = ObjectUtil.checkPositive(maxEncodeSize, "maxEncodeSize");
171 finished = false;
172 }
173
174
175
176
177 private static int compressionLevel(int blockSize) {
178 if (blockSize < MIN_BLOCK_SIZE || blockSize > MAX_BLOCK_SIZE) {
179 throw new IllegalArgumentException(String.format(
180 "blockSize: %d (expected: %d-%d)", blockSize, MIN_BLOCK_SIZE, MAX_BLOCK_SIZE));
181 }
182 int compressionLevel = 32 - Integer.numberOfLeadingZeros(blockSize - 1);
183 compressionLevel = Math.max(0, compressionLevel - COMPRESSION_LEVEL_BASE);
184 return compressionLevel;
185 }
186
187 @Override
188 protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, ByteBuf msg, boolean preferDirect) {
189 return allocateBuffer(ctx, msg, preferDirect, true);
190 }
191
192 private ByteBuf allocateBuffer(ChannelHandlerContext ctx, ByteBuf msg, boolean preferDirect,
193 boolean allowEmptyReturn) {
194 int targetBufSize = 0;
195 int remaining = msg.readableBytes() + buffer.readableBytes();
196
197
198 if (remaining < 0) {
199 throw new EncoderException("too much data to allocate a buffer for compression");
200 }
201
202 while (remaining > 0) {
203 int curSize = Math.min(blockSize, remaining);
204 remaining -= curSize;
205
206 targetBufSize += compressor.maxCompressedLength(curSize) + HEADER_LENGTH;
207 }
208
209
210
211
212 if (targetBufSize > maxEncodeSize || 0 > targetBufSize) {
213 throw new EncoderException(String.format("requested encode buffer size (%d bytes) exceeds the maximum " +
214 "allowable size (%d bytes)", targetBufSize, maxEncodeSize));
215 }
216
217 if (allowEmptyReturn && targetBufSize < blockSize) {
218 return Unpooled.EMPTY_BUFFER;
219 }
220
221 if (preferDirect) {
222 return ctx.alloc().ioBuffer(targetBufSize, targetBufSize);
223 } else {
224 return ctx.alloc().heapBuffer(targetBufSize, targetBufSize);
225 }
226 }
227
228
229
230
231
232
233
234
235 @Override
236 protected void encode(ChannelHandlerContext ctx, ByteBuf in, ByteBuf out) throws Exception {
237 if (finished) {
238 if (!out.isWritable(in.readableBytes())) {
239
240 throw new IllegalStateException("encode finished and not enough space to write remaining data");
241 }
242 out.writeBytes(in);
243 return;
244 }
245
246 final ByteBuf buffer = this.buffer;
247 int length;
248 while ((length = in.readableBytes()) > 0) {
249 final int nextChunkSize = Math.min(length, buffer.writableBytes());
250 in.readBytes(buffer, nextChunkSize);
251
252 if (!buffer.isWritable()) {
253 flushBufferedData(out);
254 }
255 }
256 }
257
258 private void flushBufferedData(ByteBuf out) {
259 int flushableBytes = buffer.readableBytes();
260 if (flushableBytes == 0) {
261 return;
262 }
263 checksum.reset();
264 checksum.update(buffer, buffer.readerIndex(), flushableBytes);
265 final int check = (int) checksum.getValue();
266
267 final int bufSize = compressor.maxCompressedLength(flushableBytes) + HEADER_LENGTH;
268 out.ensureWritable(bufSize);
269 final int idx = out.writerIndex();
270 int compressedLength;
271 try {
272 ByteBuffer outNioBuffer = out.internalNioBuffer(idx + HEADER_LENGTH, out.writableBytes() - HEADER_LENGTH);
273 int pos = outNioBuffer.position();
274
275 compressor.compress(buffer.internalNioBuffer(buffer.readerIndex(), flushableBytes), outNioBuffer);
276 compressedLength = outNioBuffer.position() - pos;
277 } catch (LZ4Exception e) {
278 throw new CompressionException(e);
279 }
280 final int blockType;
281 if (compressedLength >= flushableBytes) {
282 blockType = BLOCK_TYPE_NON_COMPRESSED;
283 compressedLength = flushableBytes;
284 out.setBytes(idx + HEADER_LENGTH, buffer, buffer.readerIndex(), flushableBytes);
285 } else {
286 blockType = BLOCK_TYPE_COMPRESSED;
287 }
288
289 out.setLong(idx, MAGIC_NUMBER);
290 out.setByte(idx + TOKEN_OFFSET, (byte) (blockType | compressionLevel));
291 out.setIntLE(idx + COMPRESSED_LENGTH_OFFSET, compressedLength);
292 out.setIntLE(idx + DECOMPRESSED_LENGTH_OFFSET, flushableBytes);
293 out.setIntLE(idx + CHECKSUM_OFFSET, check);
294 out.writerIndex(idx + HEADER_LENGTH + compressedLength);
295 buffer.clear();
296 }
297
298 @Override
299 public void flush(final ChannelHandlerContext ctx) throws Exception {
300 if (buffer != null && buffer.isReadable()) {
301 final ByteBuf buf = allocateBuffer(ctx, Unpooled.EMPTY_BUFFER, isPreferDirect(), false);
302 flushBufferedData(buf);
303 ctx.write(buf);
304 }
305 ctx.flush();
306 }
307
308 private ChannelFuture finishEncode(final ChannelHandlerContext ctx, ChannelPromise promise) {
309 if (finished) {
310 promise.setSuccess();
311 return promise;
312 }
313 finished = true;
314
315 final ByteBuf footer = ctx.alloc().heapBuffer(
316 compressor.maxCompressedLength(buffer.readableBytes()) + HEADER_LENGTH);
317 flushBufferedData(footer);
318
319 footer.ensureWritable(HEADER_LENGTH);
320 final int idx = footer.writerIndex();
321 footer.setLong(idx, MAGIC_NUMBER);
322 footer.setByte(idx + TOKEN_OFFSET, (byte) (BLOCK_TYPE_NON_COMPRESSED | compressionLevel));
323 footer.setInt(idx + COMPRESSED_LENGTH_OFFSET, 0);
324 footer.setInt(idx + DECOMPRESSED_LENGTH_OFFSET, 0);
325 footer.setInt(idx + CHECKSUM_OFFSET, 0);
326
327 footer.writerIndex(idx + HEADER_LENGTH);
328
329 return ctx.writeAndFlush(footer, promise);
330 }
331
332
333
334
335 public boolean isClosed() {
336 return finished;
337 }
338
339
340
341
342
343
344 public ChannelFuture close() {
345 return close(ctx().newPromise());
346 }
347
348
349
350
351
352
353 public ChannelFuture close(final ChannelPromise promise) {
354 ChannelHandlerContext ctx = ctx();
355 EventExecutor executor = ctx.executor();
356 if (executor.inEventLoop()) {
357 return finishEncode(ctx, promise);
358 } else {
359 executor.execute(new Runnable() {
360 @Override
361 public void run() {
362 ChannelFuture f = finishEncode(ctx(), promise);
363 PromiseNotifier.cascade(f, promise);
364 }
365 });
366 return promise;
367 }
368 }
369
370 @Override
371 public void close(final ChannelHandlerContext ctx, final ChannelPromise promise) throws Exception {
372 ChannelFuture f = finishEncode(ctx, ctx.newPromise());
373 f.addListener(new ChannelFutureListener() {
374 @Override
375 public void operationComplete(ChannelFuture f) throws Exception {
376 ctx.close(promise);
377 }
378 });
379
380 if (!f.isDone()) {
381
382 ctx.executor().schedule(new Runnable() {
383 @Override
384 public void run() {
385 ctx.close(promise);
386 }
387 }, THREAD_POOL_DELAY_SECONDS, TimeUnit.SECONDS);
388 }
389 }
390
391 private ChannelHandlerContext ctx() {
392 ChannelHandlerContext ctx = this.ctx;
393 if (ctx == null) {
394 throw new IllegalStateException("not added to a pipeline");
395 }
396 return ctx;
397 }
398
399 @Override
400 public void handlerAdded(ChannelHandlerContext ctx) {
401 this.ctx = ctx;
402
403 buffer = Unpooled.wrappedBuffer(new byte[blockSize]);
404 buffer.clear();
405 }
406
407 @Override
408 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
409 super.handlerRemoved(ctx);
410 if (buffer != null) {
411 buffer.release();
412 buffer = null;
413 }
414 }
415
416 final ByteBuf getBackingBuffer() {
417 return buffer;
418 }
419 }