1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.codec.compression;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.channel.ChannelFuture;
20 import io.netty.channel.ChannelHandlerContext;
21 import io.netty.channel.ChannelPromise;
22 import io.netty.util.concurrent.EventExecutor;
23 import io.netty.util.concurrent.PromiseNotifier;
24 import io.netty.util.internal.EmptyArrays;
25 import io.netty.util.internal.ObjectUtil;
26 import io.netty.util.internal.SystemPropertyUtil;
27 import io.netty.util.internal.logging.InternalLogger;
28 import io.netty.util.internal.logging.InternalLoggerFactory;
29
30 import java.util.zip.CRC32;
31 import java.util.zip.Deflater;
32
33
34
35
36 public class JdkZlibEncoder extends ZlibEncoder {
37
38 private static final InternalLogger logger = InternalLoggerFactory.getInstance(JdkZlibEncoder.class);
39
40
41
42
43
44 private static final int MAX_INITIAL_OUTPUT_BUFFER_SIZE;
45
46
47
48 private static final int MAX_INPUT_BUFFER_SIZE;
49
50 private final ZlibWrapper wrapper;
51 private final Deflater deflater;
52 private volatile boolean finished;
53 private volatile ChannelHandlerContext ctx;
54
55
56
57
58 private final CRC32 crc = new CRC32();
59 private static final byte[] gzipHeader = {0x1f, (byte) 0x8b, Deflater.DEFLATED, 0, 0, 0, 0, 0, 0, 0};
60 private boolean writeHeader = true;
61
62 static {
63 MAX_INITIAL_OUTPUT_BUFFER_SIZE = SystemPropertyUtil.getInt(
64 "io.netty.jdkzlib.encoder.maxInitialOutputBufferSize",
65 65536);
66 MAX_INPUT_BUFFER_SIZE = SystemPropertyUtil.getInt(
67 "io.netty.jdkzlib.encoder.maxInputBufferSize",
68 65536);
69
70 if (logger.isDebugEnabled()) {
71 logger.debug("-Dio.netty.jdkzlib.encoder.maxInitialOutputBufferSize={}", MAX_INITIAL_OUTPUT_BUFFER_SIZE);
72 logger.debug("-Dio.netty.jdkzlib.encoder.maxInputBufferSize={}", MAX_INPUT_BUFFER_SIZE);
73 }
74 }
75
76
77
78
79
80
81
82 public JdkZlibEncoder() {
83 this(6);
84 }
85
86
87
88
89
90
91
92
93
94
95
96
97 public JdkZlibEncoder(int compressionLevel) {
98 this(ZlibWrapper.ZLIB, compressionLevel);
99 }
100
101
102
103
104
105
106
107 public JdkZlibEncoder(ZlibWrapper wrapper) {
108 this(wrapper, 6);
109 }
110
111
112
113
114
115
116
117
118
119
120
121
122
123 public JdkZlibEncoder(ZlibWrapper wrapper, int compressionLevel) {
124 ObjectUtil.checkInRange(compressionLevel, Deflater.DEFAULT_COMPRESSION, Deflater.BEST_COMPRESSION,
125 "compressionLevel");
126 ObjectUtil.checkNotNull(wrapper, "wrapper");
127
128 if (wrapper == ZlibWrapper.ZLIB_OR_NONE) {
129 throw new IllegalArgumentException(
130 "wrapper '" + ZlibWrapper.ZLIB_OR_NONE + "' is not " +
131 "allowed for compression.");
132 }
133
134 this.wrapper = wrapper;
135 deflater = new Deflater(compressionLevel, wrapper != ZlibWrapper.ZLIB);
136 }
137
138
139
140
141
142
143
144
145
146
147
148 public JdkZlibEncoder(byte[] dictionary) {
149 this(6, dictionary);
150 }
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167 public JdkZlibEncoder(int compressionLevel, byte[] dictionary) {
168 ObjectUtil.checkInRange(compressionLevel, Deflater.DEFAULT_COMPRESSION, Deflater.BEST_COMPRESSION,
169 "compressionLevel");
170 ObjectUtil.checkNotNull(dictionary, "dictionary");
171
172 wrapper = ZlibWrapper.ZLIB;
173 deflater = new Deflater(compressionLevel);
174 deflater.setDictionary(dictionary);
175 }
176
177 @Override
178 public ChannelFuture close() {
179 return close(ctx().newPromise());
180 }
181
182 @Override
183 public ChannelFuture close(final ChannelPromise promise) {
184 ChannelHandlerContext ctx = ctx();
185 EventExecutor executor = ctx.executor();
186 if (executor.inEventLoop()) {
187 return finishEncode(ctx, promise);
188 } else {
189 final ChannelPromise p = ctx.newPromise();
190 executor.execute(new Runnable() {
191 @Override
192 public void run() {
193 ChannelFuture f = finishEncode(ctx(), p);
194 PromiseNotifier.cascade(f, promise);
195 }
196 });
197 return p;
198 }
199 }
200
201 private ChannelHandlerContext ctx() {
202 ChannelHandlerContext ctx = this.ctx;
203 if (ctx == null) {
204 throw new IllegalStateException("not added to a pipeline");
205 }
206 return ctx;
207 }
208
209 @Override
210 public boolean isClosed() {
211 return finished;
212 }
213
214 @Override
215 protected void encode(ChannelHandlerContext ctx, ByteBuf uncompressed, ByteBuf out) throws Exception {
216 if (finished) {
217 out.writeBytes(uncompressed);
218 return;
219 }
220
221 int len = uncompressed.readableBytes();
222 if (len == 0) {
223 return;
224 }
225
226 if (uncompressed.hasArray()) {
227
228 encodeSome(uncompressed, out);
229 } else {
230 int heapBufferSize = Math.min(len, MAX_INPUT_BUFFER_SIZE);
231 ByteBuf heapBuf = ctx.alloc().heapBuffer(heapBufferSize, heapBufferSize);
232 try {
233 while (uncompressed.isReadable()) {
234 uncompressed.readBytes(heapBuf, Math.min(heapBuf.writableBytes(), uncompressed.readableBytes()));
235 encodeSome(heapBuf, out);
236 heapBuf.clear();
237 }
238 } finally {
239 heapBuf.release();
240 }
241 }
242
243 deflater.setInput(EmptyArrays.EMPTY_BYTES);
244 }
245
246 private void encodeSome(ByteBuf in, ByteBuf out) {
247
248
249 byte[] inAry = in.array();
250 int offset = in.arrayOffset() + in.readerIndex();
251
252 if (writeHeader) {
253 writeHeader = false;
254 if (wrapper == ZlibWrapper.GZIP) {
255 out.writeBytes(gzipHeader);
256 }
257 }
258
259 int len = in.readableBytes();
260 if (wrapper == ZlibWrapper.GZIP) {
261 crc.update(inAry, offset, len);
262 }
263
264 deflater.setInput(inAry, offset, len);
265 for (;;) {
266 deflate(out);
267 if (!out.isWritable()) {
268
269
270 out.ensureWritable(out.writerIndex());
271 } else if (deflater.needsInput()) {
272
273 break;
274 }
275 }
276 in.skipBytes(len);
277 }
278
279 @Override
280 protected final ByteBuf allocateBuffer(ChannelHandlerContext ctx, ByteBuf msg,
281 boolean preferDirect) throws Exception {
282 int sizeEstimate = (int) Math.ceil(msg.readableBytes() * 1.001) + 12;
283 if (writeHeader) {
284 switch (wrapper) {
285 case GZIP:
286 sizeEstimate += gzipHeader.length;
287 break;
288 case ZLIB:
289 sizeEstimate += 2;
290 break;
291 default:
292
293 }
294 }
295
296 if (sizeEstimate < 0 || sizeEstimate > MAX_INITIAL_OUTPUT_BUFFER_SIZE) {
297
298 return ctx.alloc().heapBuffer(MAX_INITIAL_OUTPUT_BUFFER_SIZE);
299 }
300 return ctx.alloc().heapBuffer(sizeEstimate);
301 }
302
303 @Override
304 public void close(final ChannelHandlerContext ctx, final ChannelPromise promise) throws Exception {
305 ChannelFuture f = finishEncode(ctx, ctx.newPromise());
306 EncoderUtil.closeAfterFinishEncode(ctx, f, promise);
307 }
308
309 private ChannelFuture finishEncode(final ChannelHandlerContext ctx, ChannelPromise promise) {
310 if (finished) {
311 promise.setSuccess();
312 return promise;
313 }
314
315 finished = true;
316 ByteBuf footer = ctx.alloc().heapBuffer();
317 if (writeHeader && wrapper == ZlibWrapper.GZIP) {
318
319 writeHeader = false;
320 footer.writeBytes(gzipHeader);
321 }
322
323 deflater.finish();
324
325 while (!deflater.finished()) {
326 deflate(footer);
327 if (!footer.isWritable()) {
328
329 ctx.write(footer);
330 footer = ctx.alloc().heapBuffer();
331 }
332 }
333 if (wrapper == ZlibWrapper.GZIP) {
334 int crcValue = (int) crc.getValue();
335 int uncBytes = deflater.getTotalIn();
336 footer.writeByte(crcValue);
337 footer.writeByte(crcValue >>> 8);
338 footer.writeByte(crcValue >>> 16);
339 footer.writeByte(crcValue >>> 24);
340 footer.writeByte(uncBytes);
341 footer.writeByte(uncBytes >>> 8);
342 footer.writeByte(uncBytes >>> 16);
343 footer.writeByte(uncBytes >>> 24);
344 }
345 deflater.end();
346 return ctx.writeAndFlush(footer, promise);
347 }
348
349 private void deflate(ByteBuf out) {
350 int numBytes;
351 do {
352 int writerIndex = out.writerIndex();
353 numBytes = deflater.deflate(
354 out.array(), out.arrayOffset() + writerIndex, out.writableBytes(), Deflater.SYNC_FLUSH);
355 out.writerIndex(writerIndex + numBytes);
356 } while (numBytes > 0);
357 }
358
359 @Override
360 public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
361 this.ctx = ctx;
362 }
363 }