1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.codec.compression;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.channel.ChannelFuture;
20 import io.netty.channel.ChannelHandlerContext;
21 import io.netty.channel.ChannelPromise;
22 import io.netty.util.concurrent.EventExecutor;
23 import io.netty.util.concurrent.PromiseNotifier;
24 import io.netty.util.internal.EmptyArrays;
25 import io.netty.util.internal.ObjectUtil;
26 import io.netty.util.internal.PlatformDependent;
27 import io.netty.util.internal.SuppressJava6Requirement;
28 import io.netty.util.internal.SystemPropertyUtil;
29 import io.netty.util.internal.logging.InternalLogger;
30 import io.netty.util.internal.logging.InternalLoggerFactory;
31
32 import java.util.zip.CRC32;
33 import java.util.zip.Deflater;
34
35
36
37
38 public class JdkZlibEncoder extends ZlibEncoder {
39
40 private static final InternalLogger logger = InternalLoggerFactory.getInstance(JdkZlibEncoder.class);
41
42
43
44
45
46 private static final int MAX_INITIAL_OUTPUT_BUFFER_SIZE;
47
48
49
50 private static final int MAX_INPUT_BUFFER_SIZE;
51
52 private final ZlibWrapper wrapper;
53 private final Deflater deflater;
54 private volatile boolean finished;
55 private volatile ChannelHandlerContext ctx;
56
57
58
59
60 private final CRC32 crc = new CRC32();
61 private static final byte[] gzipHeader = {0x1f, (byte) 0x8b, Deflater.DEFLATED, 0, 0, 0, 0, 0, 0, 0};
62 private boolean writeHeader = true;
63
64 static {
65 MAX_INITIAL_OUTPUT_BUFFER_SIZE = SystemPropertyUtil.getInt(
66 "io.netty.jdkzlib.encoder.maxInitialOutputBufferSize",
67 65536);
68 MAX_INPUT_BUFFER_SIZE = SystemPropertyUtil.getInt(
69 "io.netty.jdkzlib.encoder.maxInputBufferSize",
70 65536);
71
72 if (logger.isDebugEnabled()) {
73 logger.debug("-Dio.netty.jdkzlib.encoder.maxInitialOutputBufferSize={}", MAX_INITIAL_OUTPUT_BUFFER_SIZE);
74 logger.debug("-Dio.netty.jdkzlib.encoder.maxInputBufferSize={}", MAX_INPUT_BUFFER_SIZE);
75 }
76 }
77
78
79
80
81
82
83
84 public JdkZlibEncoder() {
85 this(6);
86 }
87
88
89
90
91
92
93
94
95
96
97
98
99 public JdkZlibEncoder(int compressionLevel) {
100 this(ZlibWrapper.ZLIB, compressionLevel);
101 }
102
103
104
105
106
107
108
109 public JdkZlibEncoder(ZlibWrapper wrapper) {
110 this(wrapper, 6);
111 }
112
113
114
115
116
117
118
119
120
121
122
123
124
125 public JdkZlibEncoder(ZlibWrapper wrapper, int compressionLevel) {
126 ObjectUtil.checkInRange(compressionLevel, Deflater.DEFAULT_COMPRESSION, Deflater.BEST_COMPRESSION,
127 "compressionLevel");
128 ObjectUtil.checkNotNull(wrapper, "wrapper");
129
130 if (wrapper == ZlibWrapper.ZLIB_OR_NONE) {
131 throw new IllegalArgumentException(
132 "wrapper '" + ZlibWrapper.ZLIB_OR_NONE + "' is not " +
133 "allowed for compression.");
134 }
135
136 this.wrapper = wrapper;
137 deflater = new Deflater(compressionLevel, wrapper != ZlibWrapper.ZLIB);
138 }
139
140
141
142
143
144
145
146
147
148
149
150 public JdkZlibEncoder(byte[] dictionary) {
151 this(6, dictionary);
152 }
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169 public JdkZlibEncoder(int compressionLevel, byte[] dictionary) {
170 ObjectUtil.checkInRange(compressionLevel, Deflater.DEFAULT_COMPRESSION, Deflater.BEST_COMPRESSION,
171 "compressionLevel");
172 ObjectUtil.checkNotNull(dictionary, "dictionary");
173
174 wrapper = ZlibWrapper.ZLIB;
175 deflater = new Deflater(compressionLevel);
176 deflater.setDictionary(dictionary);
177 }
178
179 @Override
180 public ChannelFuture close() {
181 return close(ctx().newPromise());
182 }
183
184 @Override
185 public ChannelFuture close(final ChannelPromise promise) {
186 ChannelHandlerContext ctx = ctx();
187 EventExecutor executor = ctx.executor();
188 if (executor.inEventLoop()) {
189 return finishEncode(ctx, promise);
190 } else {
191 final ChannelPromise p = ctx.newPromise();
192 executor.execute(new Runnable() {
193 @Override
194 public void run() {
195 ChannelFuture f = finishEncode(ctx(), p);
196 PromiseNotifier.cascade(f, promise);
197 }
198 });
199 return p;
200 }
201 }
202
203 private ChannelHandlerContext ctx() {
204 ChannelHandlerContext ctx = this.ctx;
205 if (ctx == null) {
206 throw new IllegalStateException("not added to a pipeline");
207 }
208 return ctx;
209 }
210
211 @Override
212 public boolean isClosed() {
213 return finished;
214 }
215
216 @Override
217 protected void encode(ChannelHandlerContext ctx, ByteBuf uncompressed, ByteBuf out) throws Exception {
218 if (finished) {
219 out.writeBytes(uncompressed);
220 return;
221 }
222
223 int len = uncompressed.readableBytes();
224 if (len == 0) {
225 return;
226 }
227
228 if (uncompressed.hasArray()) {
229
230 encodeSome(uncompressed, out);
231 } else {
232 int heapBufferSize = Math.min(len, MAX_INPUT_BUFFER_SIZE);
233 ByteBuf heapBuf = ctx.alloc().heapBuffer(heapBufferSize, heapBufferSize);
234 try {
235 while (uncompressed.isReadable()) {
236 uncompressed.readBytes(heapBuf, Math.min(heapBuf.writableBytes(), uncompressed.readableBytes()));
237 encodeSome(heapBuf, out);
238 heapBuf.clear();
239 }
240 } finally {
241 heapBuf.release();
242 }
243 }
244
245 deflater.setInput(EmptyArrays.EMPTY_BYTES);
246 }
247
248 private void encodeSome(ByteBuf in, ByteBuf out) {
249
250
251 byte[] inAry = in.array();
252 int offset = in.arrayOffset() + in.readerIndex();
253
254 if (writeHeader) {
255 writeHeader = false;
256 if (wrapper == ZlibWrapper.GZIP) {
257 out.writeBytes(gzipHeader);
258 }
259 }
260
261 int len = in.readableBytes();
262 if (wrapper == ZlibWrapper.GZIP) {
263 crc.update(inAry, offset, len);
264 }
265
266 deflater.setInput(inAry, offset, len);
267 for (;;) {
268 deflate(out);
269 if (!out.isWritable()) {
270
271
272 out.ensureWritable(out.writerIndex());
273 } else if (deflater.needsInput()) {
274
275 break;
276 }
277 }
278 in.skipBytes(len);
279 }
280
281 @Override
282 protected final ByteBuf allocateBuffer(ChannelHandlerContext ctx, ByteBuf msg,
283 boolean preferDirect) throws Exception {
284 int sizeEstimate = (int) Math.ceil(msg.readableBytes() * 1.001) + 12;
285 if (writeHeader) {
286 switch (wrapper) {
287 case GZIP:
288 sizeEstimate += gzipHeader.length;
289 break;
290 case ZLIB:
291 sizeEstimate += 2;
292 break;
293 default:
294
295 }
296 }
297
298 if (sizeEstimate < 0 || sizeEstimate > MAX_INITIAL_OUTPUT_BUFFER_SIZE) {
299
300 return ctx.alloc().heapBuffer(MAX_INITIAL_OUTPUT_BUFFER_SIZE);
301 }
302 return ctx.alloc().heapBuffer(sizeEstimate);
303 }
304
305 @Override
306 public void close(final ChannelHandlerContext ctx, final ChannelPromise promise) throws Exception {
307 ChannelFuture f = finishEncode(ctx, ctx.newPromise());
308 EncoderUtil.closeAfterFinishEncode(ctx, f, promise);
309 }
310
311 private ChannelFuture finishEncode(final ChannelHandlerContext ctx, ChannelPromise promise) {
312 if (finished) {
313 promise.setSuccess();
314 return promise;
315 }
316
317 finished = true;
318 ByteBuf footer = ctx.alloc().heapBuffer();
319 if (writeHeader && wrapper == ZlibWrapper.GZIP) {
320
321 writeHeader = false;
322 footer.writeBytes(gzipHeader);
323 }
324
325 deflater.finish();
326
327 while (!deflater.finished()) {
328 deflate(footer);
329 if (!footer.isWritable()) {
330
331 ctx.write(footer);
332 footer = ctx.alloc().heapBuffer();
333 }
334 }
335 if (wrapper == ZlibWrapper.GZIP) {
336 int crcValue = (int) crc.getValue();
337 int uncBytes = deflater.getTotalIn();
338 footer.writeByte(crcValue);
339 footer.writeByte(crcValue >>> 8);
340 footer.writeByte(crcValue >>> 16);
341 footer.writeByte(crcValue >>> 24);
342 footer.writeByte(uncBytes);
343 footer.writeByte(uncBytes >>> 8);
344 footer.writeByte(uncBytes >>> 16);
345 footer.writeByte(uncBytes >>> 24);
346 }
347 deflater.end();
348 return ctx.writeAndFlush(footer, promise);
349 }
350
351 @SuppressJava6Requirement(reason = "Usage guarded by java version check")
352 private void deflate(ByteBuf out) {
353 if (PlatformDependent.javaVersion() < 7) {
354 deflateJdk6(out);
355 }
356 int numBytes;
357 do {
358 int writerIndex = out.writerIndex();
359 numBytes = deflater.deflate(
360 out.array(), out.arrayOffset() + writerIndex, out.writableBytes(), Deflater.SYNC_FLUSH);
361 out.writerIndex(writerIndex + numBytes);
362 } while (numBytes > 0);
363 }
364
365 private void deflateJdk6(ByteBuf out) {
366 int numBytes;
367 do {
368 int writerIndex = out.writerIndex();
369 numBytes = deflater.deflate(
370 out.array(), out.arrayOffset() + writerIndex, out.writableBytes());
371 out.writerIndex(writerIndex + numBytes);
372 } while (numBytes > 0);
373 }
374
375 @Override
376 public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
377 this.ctx = ctx;
378 }
379 }