1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.codec.compression;
17
18 import com.jcraft.jzlib.Deflater;
19 import com.jcraft.jzlib.JZlib;
20 import io.netty.buffer.ByteBuf;
21 import io.netty.buffer.Unpooled;
22 import io.netty.channel.ChannelFuture;
23 import io.netty.channel.ChannelFutureListener;
24 import io.netty.channel.ChannelHandlerContext;
25 import io.netty.channel.ChannelPromise;
26 import io.netty.util.concurrent.EventExecutor;
27 import io.netty.util.concurrent.PromiseNotifier;
28 import io.netty.util.internal.EmptyArrays;
29 import io.netty.util.internal.ObjectUtil;
30
31 import java.util.concurrent.TimeUnit;
32
33
34
35
36 public class JZlibEncoder extends ZlibEncoder {
37
38 private final int wrapperOverhead;
39 private final Deflater z = new Deflater();
40 private volatile boolean finished;
41 private volatile ChannelHandlerContext ctx;
42
43 private static final int THREAD_POOL_DELAY_SECONDS = 10;
44
45
46
47
48
49
50
51
52 public JZlibEncoder() {
53 this(6);
54 }
55
56
57
58
59
60
61
62
63
64
65
66
67
68 public JZlibEncoder(int compressionLevel) {
69 this(ZlibWrapper.ZLIB, compressionLevel);
70 }
71
72
73
74
75
76
77
78
79 public JZlibEncoder(ZlibWrapper wrapper) {
80 this(wrapper, 6);
81 }
82
83
84
85
86
87
88
89
90
91
92
93
94
95 public JZlibEncoder(ZlibWrapper wrapper, int compressionLevel) {
96 this(wrapper, compressionLevel, 15, 8);
97 }
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121 public JZlibEncoder(ZlibWrapper wrapper, int compressionLevel, int windowBits, int memLevel) {
122 ObjectUtil.checkInRange(compressionLevel, 0, 9, "compressionLevel");
123 ObjectUtil.checkInRange(windowBits, 9, 15, "windowBits");
124 ObjectUtil.checkInRange(memLevel, 1, 9, "memLevel");
125 ObjectUtil.checkNotNull(wrapper, "wrapper");
126
127 if (wrapper == ZlibWrapper.ZLIB_OR_NONE) {
128 throw new IllegalArgumentException(
129 "wrapper '" + ZlibWrapper.ZLIB_OR_NONE + "' is not " +
130 "allowed for compression.");
131 }
132
133 int resultCode = z.init(
134 compressionLevel, windowBits, memLevel,
135 ZlibUtil.convertWrapperType(wrapper));
136 if (resultCode != JZlib.Z_OK) {
137 ZlibUtil.fail(z, "initialization failure", resultCode);
138 }
139
140 wrapperOverhead = ZlibUtil.wrapperOverhead(wrapper);
141 }
142
143
144
145
146
147
148
149
150
151
152
153
154 public JZlibEncoder(byte[] dictionary) {
155 this(6, dictionary);
156 }
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173 public JZlibEncoder(int compressionLevel, byte[] dictionary) {
174 this(compressionLevel, 15, 8, dictionary);
175 }
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202 public JZlibEncoder(int compressionLevel, int windowBits, int memLevel, byte[] dictionary) {
203 ObjectUtil.checkInRange(compressionLevel, 0, 9, "compressionLevel");
204 ObjectUtil.checkInRange(windowBits, 9, 15, "windowBits");
205 ObjectUtil.checkInRange(memLevel, 1, 9, "memLevel");
206 ObjectUtil.checkNotNull(dictionary, "dictionary");
207
208 int resultCode;
209 resultCode = z.deflateInit(
210 compressionLevel, windowBits, memLevel,
211 JZlib.W_ZLIB);
212 if (resultCode != JZlib.Z_OK) {
213 ZlibUtil.fail(z, "initialization failure", resultCode);
214 } else {
215 resultCode = z.deflateSetDictionary(dictionary, dictionary.length);
216 if (resultCode != JZlib.Z_OK) {
217 ZlibUtil.fail(z, "failed to set the dictionary", resultCode);
218 }
219 }
220
221 wrapperOverhead = ZlibUtil.wrapperOverhead(ZlibWrapper.ZLIB);
222 }
223
224 @Override
225 public ChannelFuture close() {
226 return close(ctx().channel().newPromise());
227 }
228
229 @Override
230 public ChannelFuture close(final ChannelPromise promise) {
231 ChannelHandlerContext ctx = ctx();
232 EventExecutor executor = ctx.executor();
233 if (executor.inEventLoop()) {
234 return finishEncode(ctx, promise);
235 } else {
236 final ChannelPromise p = ctx.newPromise();
237 executor.execute(new Runnable() {
238 @Override
239 public void run() {
240 ChannelFuture f = finishEncode(ctx(), p);
241 PromiseNotifier.cascade(f, promise);
242 }
243 });
244 return p;
245 }
246 }
247
248 private ChannelHandlerContext ctx() {
249 ChannelHandlerContext ctx = this.ctx;
250 if (ctx == null) {
251 throw new IllegalStateException("not added to a pipeline");
252 }
253 return ctx;
254 }
255
256 @Override
257 public boolean isClosed() {
258 return finished;
259 }
260
261 @Override
262 protected void encode(ChannelHandlerContext ctx, ByteBuf in, ByteBuf out) throws Exception {
263 if (finished) {
264 out.writeBytes(in);
265 return;
266 }
267
268 int inputLength = in.readableBytes();
269 if (inputLength == 0) {
270 return;
271 }
272
273 try {
274
275 boolean inHasArray = in.hasArray();
276 z.avail_in = inputLength;
277 if (inHasArray) {
278 z.next_in = in.array();
279 z.next_in_index = in.arrayOffset() + in.readerIndex();
280 } else {
281 byte[] array = new byte[inputLength];
282 in.getBytes(in.readerIndex(), array);
283 z.next_in = array;
284 z.next_in_index = 0;
285 }
286 int oldNextInIndex = z.next_in_index;
287
288
289 int maxOutputLength = (int) Math.ceil(inputLength * 1.001) + 12 + wrapperOverhead;
290 out.ensureWritable(maxOutputLength);
291 z.avail_out = maxOutputLength;
292 z.next_out = out.array();
293 z.next_out_index = out.arrayOffset() + out.writerIndex();
294 int oldNextOutIndex = z.next_out_index;
295
296
297 int resultCode;
298 try {
299 resultCode = z.deflate(JZlib.Z_SYNC_FLUSH);
300 } finally {
301 in.skipBytes(z.next_in_index - oldNextInIndex);
302 }
303
304 if (resultCode != JZlib.Z_OK) {
305 ZlibUtil.fail(z, "compression failure", resultCode);
306 }
307
308 int outputLength = z.next_out_index - oldNextOutIndex;
309 if (outputLength > 0) {
310 out.writerIndex(out.writerIndex() + outputLength);
311 }
312 } finally {
313
314
315
316
317 z.next_in = null;
318 z.next_out = null;
319 }
320 }
321
322 @Override
323 public void close(
324 final ChannelHandlerContext ctx,
325 final ChannelPromise promise) {
326 ChannelFuture f = finishEncode(ctx, ctx.newPromise());
327 f.addListener(new ChannelFutureListener() {
328 @Override
329 public void operationComplete(ChannelFuture f) throws Exception {
330 ctx.close(promise);
331 }
332 });
333
334 if (!f.isDone()) {
335
336 ctx.executor().schedule(new Runnable() {
337 @Override
338 public void run() {
339 ctx.close(promise);
340 }
341 }, THREAD_POOL_DELAY_SECONDS, TimeUnit.SECONDS);
342 }
343 }
344
345 private ChannelFuture finishEncode(ChannelHandlerContext ctx, ChannelPromise promise) {
346 if (finished) {
347 promise.setSuccess();
348 return promise;
349 }
350 finished = true;
351
352 ByteBuf footer;
353 try {
354
355 z.next_in = EmptyArrays.EMPTY_BYTES;
356 z.next_in_index = 0;
357 z.avail_in = 0;
358
359
360 byte[] out = new byte[32];
361 z.next_out = out;
362 z.next_out_index = 0;
363 z.avail_out = out.length;
364
365
366 int resultCode = z.deflate(JZlib.Z_FINISH);
367 if (resultCode != JZlib.Z_OK && resultCode != JZlib.Z_STREAM_END) {
368 promise.setFailure(ZlibUtil.deflaterException(z, "compression failure", resultCode));
369 return promise;
370 } else if (z.next_out_index != 0) {
371
372
373 footer = Unpooled.wrappedBuffer(out, 0, z.next_out_index);
374 } else {
375 footer = Unpooled.EMPTY_BUFFER;
376 }
377 } finally {
378 z.deflateEnd();
379
380
381
382
383
384 z.next_in = null;
385 z.next_out = null;
386 }
387 return ctx.writeAndFlush(footer, promise);
388 }
389
390 @Override
391 public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
392 this.ctx = ctx;
393 }
394 }