1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.codec.compression;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.channel.ChannelFuture;
20 import io.netty.channel.ChannelHandlerContext;
21 import io.netty.channel.ChannelPromise;
22 import io.netty.util.concurrent.EventExecutor;
23 import io.netty.util.concurrent.PromiseNotifier;
24 import io.netty.util.internal.EmptyArrays;
25 import io.netty.util.internal.ObjectUtil;
26 import io.netty.util.internal.SystemPropertyUtil;
27 import io.netty.util.internal.logging.InternalLogger;
28 import io.netty.util.internal.logging.InternalLoggerFactory;
29
30 import java.util.zip.CRC32;
31 import java.util.zip.Deflater;
32
33
34
35
36 public class JdkZlibEncoder extends ZlibEncoder {
37
38 private static final InternalLogger logger = InternalLoggerFactory.getInstance(JdkZlibEncoder.class);
39
40
41
42
43
44 private static final int MAX_INITIAL_OUTPUT_BUFFER_SIZE;
45
46
47
48 private static final int MAX_INPUT_BUFFER_SIZE;
49
50 private final ZlibWrapper wrapper;
51 private final Deflater deflater;
52 private volatile boolean finished;
53 private volatile ChannelHandlerContext ctx;
54
55
56
57
58 private final CRC32 crc = new CRC32();
59 private static final byte[] gzipHeader = {0x1f, (byte) 0x8b, Deflater.DEFLATED, 0, 0, 0, 0, 0, 0, 0};
60 private boolean writeHeader = true;
61
62 static {
63 MAX_INITIAL_OUTPUT_BUFFER_SIZE = SystemPropertyUtil.getInt(
64 "io.netty.jdkzlib.encoder.maxInitialOutputBufferSize",
65 65536);
66 MAX_INPUT_BUFFER_SIZE = SystemPropertyUtil.getInt(
67 "io.netty.jdkzlib.encoder.maxInputBufferSize",
68 65536);
69
70 if (logger.isDebugEnabled()) {
71 logger.debug("-Dio.netty.jdkzlib.encoder.maxInitialOutputBufferSize={}", MAX_INITIAL_OUTPUT_BUFFER_SIZE);
72 logger.debug("-Dio.netty.jdkzlib.encoder.maxInputBufferSize={}", MAX_INPUT_BUFFER_SIZE);
73 }
74 }
75
76
77
78
79
80
81
82 public JdkZlibEncoder() {
83 this(6);
84 }
85
86
87
88
89
90
91
92
93
94
95
96
97 public JdkZlibEncoder(int compressionLevel) {
98 this(ZlibWrapper.ZLIB, compressionLevel);
99 }
100
101
102
103
104
105
106
107 public JdkZlibEncoder(ZlibWrapper wrapper) {
108 this(wrapper, 6);
109 }
110
111
112
113
114
115
116
117
118
119
120
121
122 public JdkZlibEncoder(ZlibWrapper wrapper, int compressionLevel) {
123 ObjectUtil.checkInRange(compressionLevel, 0, 9, "compressionLevel");
124 ObjectUtil.checkNotNull(wrapper, "wrapper");
125
126 if (wrapper == ZlibWrapper.ZLIB_OR_NONE) {
127 throw new IllegalArgumentException(
128 "wrapper '" + ZlibWrapper.ZLIB_OR_NONE + "' is not " +
129 "allowed for compression.");
130 }
131
132 this.wrapper = wrapper;
133 deflater = new Deflater(compressionLevel, wrapper != ZlibWrapper.ZLIB);
134 }
135
136
137
138
139
140
141
142
143
144
145
146 public JdkZlibEncoder(byte[] dictionary) {
147 this(6, dictionary);
148 }
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164 public JdkZlibEncoder(int compressionLevel, byte[] dictionary) {
165 ObjectUtil.checkInRange(compressionLevel, 0, 9, "compressionLevel");
166 ObjectUtil.checkNotNull(dictionary, "dictionary");
167
168 wrapper = ZlibWrapper.ZLIB;
169 deflater = new Deflater(compressionLevel);
170 deflater.setDictionary(dictionary);
171 }
172
173 @Override
174 public ChannelFuture close() {
175 return close(ctx().newPromise());
176 }
177
178 @Override
179 public ChannelFuture close(final ChannelPromise promise) {
180 ChannelHandlerContext ctx = ctx();
181 EventExecutor executor = ctx.executor();
182 if (executor.inEventLoop()) {
183 return finishEncode(ctx, promise);
184 } else {
185 final ChannelPromise p = ctx.newPromise();
186 executor.execute(new Runnable() {
187 @Override
188 public void run() {
189 ChannelFuture f = finishEncode(ctx(), p);
190 PromiseNotifier.cascade(f, promise);
191 }
192 });
193 return p;
194 }
195 }
196
197 private ChannelHandlerContext ctx() {
198 ChannelHandlerContext ctx = this.ctx;
199 if (ctx == null) {
200 throw new IllegalStateException("not added to a pipeline");
201 }
202 return ctx;
203 }
204
205 @Override
206 public boolean isClosed() {
207 return finished;
208 }
209
210 @Override
211 protected void encode(ChannelHandlerContext ctx, ByteBuf uncompressed, ByteBuf out) throws Exception {
212 if (finished) {
213 out.writeBytes(uncompressed);
214 return;
215 }
216
217 int len = uncompressed.readableBytes();
218 if (len == 0) {
219 return;
220 }
221
222 if (uncompressed.hasArray()) {
223
224 encodeSome(uncompressed, out);
225 } else {
226 int heapBufferSize = Math.min(len, MAX_INPUT_BUFFER_SIZE);
227 ByteBuf heapBuf = ctx.alloc().heapBuffer(heapBufferSize, heapBufferSize);
228 try {
229 while (uncompressed.isReadable()) {
230 uncompressed.readBytes(heapBuf, Math.min(heapBuf.writableBytes(), uncompressed.readableBytes()));
231 encodeSome(heapBuf, out);
232 heapBuf.clear();
233 }
234 } finally {
235 heapBuf.release();
236 }
237 }
238
239 deflater.setInput(EmptyArrays.EMPTY_BYTES);
240 }
241
242 private void encodeSome(ByteBuf in, ByteBuf out) {
243
244
245 byte[] inAry = in.array();
246 int offset = in.arrayOffset() + in.readerIndex();
247
248 if (writeHeader) {
249 writeHeader = false;
250 if (wrapper == ZlibWrapper.GZIP) {
251 out.writeBytes(gzipHeader);
252 }
253 }
254
255 int len = in.readableBytes();
256 if (wrapper == ZlibWrapper.GZIP) {
257 crc.update(inAry, offset, len);
258 }
259
260 deflater.setInput(inAry, offset, len);
261 for (;;) {
262 deflate(out);
263 if (!out.isWritable()) {
264
265
266 out.ensureWritable(out.writerIndex());
267 } else if (deflater.needsInput()) {
268
269 break;
270 }
271 }
272 in.skipBytes(len);
273 }
274
275 @Override
276 protected final ByteBuf allocateBuffer(ChannelHandlerContext ctx, ByteBuf msg,
277 boolean preferDirect) throws Exception {
278 int sizeEstimate = (int) Math.ceil(msg.readableBytes() * 1.001) + 12;
279 if (writeHeader) {
280 switch (wrapper) {
281 case GZIP:
282 sizeEstimate += gzipHeader.length;
283 break;
284 case ZLIB:
285 sizeEstimate += 2;
286 break;
287 default:
288
289 }
290 }
291
292 if (sizeEstimate < 0 || sizeEstimate > MAX_INITIAL_OUTPUT_BUFFER_SIZE) {
293
294 return ctx.alloc().heapBuffer(MAX_INITIAL_OUTPUT_BUFFER_SIZE);
295 }
296 return ctx.alloc().heapBuffer(sizeEstimate);
297 }
298
299 @Override
300 public void close(final ChannelHandlerContext ctx, final ChannelPromise promise) throws Exception {
301 ChannelFuture f = finishEncode(ctx, ctx.newPromise());
302 EncoderUtil.closeAfterFinishEncode(ctx, f, promise);
303 }
304
305 private ChannelFuture finishEncode(final ChannelHandlerContext ctx, ChannelPromise promise) {
306 if (finished) {
307 promise.setSuccess();
308 return promise;
309 }
310
311 finished = true;
312 ByteBuf footer = ctx.alloc().heapBuffer();
313 if (writeHeader && wrapper == ZlibWrapper.GZIP) {
314
315 writeHeader = false;
316 footer.writeBytes(gzipHeader);
317 }
318
319 deflater.finish();
320
321 while (!deflater.finished()) {
322 deflate(footer);
323 if (!footer.isWritable()) {
324
325 ctx.write(footer);
326 footer = ctx.alloc().heapBuffer();
327 }
328 }
329 if (wrapper == ZlibWrapper.GZIP) {
330 int crcValue = (int) crc.getValue();
331 int uncBytes = deflater.getTotalIn();
332 footer.writeByte(crcValue);
333 footer.writeByte(crcValue >>> 8);
334 footer.writeByte(crcValue >>> 16);
335 footer.writeByte(crcValue >>> 24);
336 footer.writeByte(uncBytes);
337 footer.writeByte(uncBytes >>> 8);
338 footer.writeByte(uncBytes >>> 16);
339 footer.writeByte(uncBytes >>> 24);
340 }
341 deflater.end();
342 return ctx.writeAndFlush(footer, promise);
343 }
344
345 private void deflate(ByteBuf out) {
346 int numBytes;
347 do {
348 int writerIndex = out.writerIndex();
349 numBytes = deflater.deflate(
350 out.array(), out.arrayOffset() + writerIndex, out.writableBytes(), Deflater.SYNC_FLUSH);
351 out.writerIndex(writerIndex + numBytes);
352 } while (numBytes > 0);
353 }
354
355 @Override
356 public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
357 this.ctx = ctx;
358 }
359 }