1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.codec.compression;
17
18 import com.jcraft.jzlib.Deflater;
19 import com.jcraft.jzlib.JZlib;
20 import io.netty.buffer.ByteBuf;
21 import io.netty.buffer.Unpooled;
22 import io.netty.channel.ChannelFuture;
23 import io.netty.channel.ChannelFutureListener;
24 import io.netty.channel.ChannelHandlerContext;
25 import io.netty.channel.ChannelPromise;
26 import io.netty.channel.ChannelPromiseNotifier;
27 import io.netty.util.concurrent.EventExecutor;
28 import io.netty.util.internal.EmptyArrays;
29
30 import java.util.concurrent.TimeUnit;
31
32
33
34
35 public class JZlibEncoder extends ZlibEncoder {
36
37 private final int wrapperOverhead;
38 private final Deflater z = new Deflater();
39 private volatile boolean finished;
40 private volatile ChannelHandlerContext ctx;
41
42
43
44
45
46
47
48
49 public JZlibEncoder() {
50 this(6);
51 }
52
53
54
55
56
57
58
59
60
61
62
63
64
65 public JZlibEncoder(int compressionLevel) {
66 this(ZlibWrapper.ZLIB, compressionLevel);
67 }
68
69
70
71
72
73
74
75
76 public JZlibEncoder(ZlibWrapper wrapper) {
77 this(wrapper, 6);
78 }
79
80
81
82
83
84
85
86
87
88
89
90
91
92 public JZlibEncoder(ZlibWrapper wrapper, int compressionLevel) {
93 this(wrapper, compressionLevel, 15, 8);
94 }
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118 public JZlibEncoder(ZlibWrapper wrapper, int compressionLevel, int windowBits, int memLevel) {
119
120 if (compressionLevel < 0 || compressionLevel > 9) {
121 throw new IllegalArgumentException(
122 "compressionLevel: " + compressionLevel +
123 " (expected: 0-9)");
124 }
125 if (windowBits < 9 || windowBits > 15) {
126 throw new IllegalArgumentException(
127 "windowBits: " + windowBits + " (expected: 9-15)");
128 }
129 if (memLevel < 1 || memLevel > 9) {
130 throw new IllegalArgumentException(
131 "memLevel: " + memLevel + " (expected: 1-9)");
132 }
133 if (wrapper == null) {
134 throw new NullPointerException("wrapper");
135 }
136 if (wrapper == ZlibWrapper.ZLIB_OR_NONE) {
137 throw new IllegalArgumentException(
138 "wrapper '" + ZlibWrapper.ZLIB_OR_NONE + "' is not " +
139 "allowed for compression.");
140 }
141
142 int resultCode = z.init(
143 compressionLevel, windowBits, memLevel,
144 ZlibUtil.convertWrapperType(wrapper));
145 if (resultCode != JZlib.Z_OK) {
146 ZlibUtil.fail(z, "initialization failure", resultCode);
147 }
148
149 wrapperOverhead = ZlibUtil.wrapperOverhead(wrapper);
150 }
151
152
153
154
155
156
157
158
159
160
161
162
163 public JZlibEncoder(byte[] dictionary) {
164 this(6, dictionary);
165 }
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182 public JZlibEncoder(int compressionLevel, byte[] dictionary) {
183 this(compressionLevel, 15, 8, dictionary);
184 }
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211 public JZlibEncoder(int compressionLevel, int windowBits, int memLevel, byte[] dictionary) {
212 if (compressionLevel < 0 || compressionLevel > 9) {
213 throw new IllegalArgumentException("compressionLevel: " + compressionLevel + " (expected: 0-9)");
214 }
215 if (windowBits < 9 || windowBits > 15) {
216 throw new IllegalArgumentException(
217 "windowBits: " + windowBits + " (expected: 9-15)");
218 }
219 if (memLevel < 1 || memLevel > 9) {
220 throw new IllegalArgumentException(
221 "memLevel: " + memLevel + " (expected: 1-9)");
222 }
223 if (dictionary == null) {
224 throw new NullPointerException("dictionary");
225 }
226 int resultCode;
227 resultCode = z.deflateInit(
228 compressionLevel, windowBits, memLevel,
229 JZlib.W_ZLIB);
230 if (resultCode != JZlib.Z_OK) {
231 ZlibUtil.fail(z, "initialization failure", resultCode);
232 } else {
233 resultCode = z.deflateSetDictionary(dictionary, dictionary.length);
234 if (resultCode != JZlib.Z_OK) {
235 ZlibUtil.fail(z, "failed to set the dictionary", resultCode);
236 }
237 }
238
239 wrapperOverhead = ZlibUtil.wrapperOverhead(ZlibWrapper.ZLIB);
240 }
241
242 @Override
243 public ChannelFuture close() {
244 return close(ctx().channel().newPromise());
245 }
246
247 @Override
248 public ChannelFuture close(final ChannelPromise promise) {
249 ChannelHandlerContext ctx = ctx();
250 EventExecutor executor = ctx.executor();
251 if (executor.inEventLoop()) {
252 return finishEncode(ctx, promise);
253 } else {
254 final ChannelPromise p = ctx.newPromise();
255 executor.execute(new Runnable() {
256 @Override
257 public void run() {
258 ChannelFuture f = finishEncode(ctx(), p);
259 f.addListener(new ChannelPromiseNotifier(promise));
260 }
261 });
262 return p;
263 }
264 }
265
266 private ChannelHandlerContext ctx() {
267 ChannelHandlerContext ctx = this.ctx;
268 if (ctx == null) {
269 throw new IllegalStateException("not added to a pipeline");
270 }
271 return ctx;
272 }
273
274 @Override
275 public boolean isClosed() {
276 return finished;
277 }
278
279 @Override
280 protected void encode(ChannelHandlerContext ctx, ByteBuf in, ByteBuf out) throws Exception {
281 if (finished) {
282 out.writeBytes(in);
283 return;
284 }
285
286 int inputLength = in.readableBytes();
287 if (inputLength == 0) {
288 return;
289 }
290
291 try {
292
293 boolean inHasArray = in.hasArray();
294 z.avail_in = inputLength;
295 if (inHasArray) {
296 z.next_in = in.array();
297 z.next_in_index = in.arrayOffset() + in.readerIndex();
298 } else {
299 byte[] array = new byte[inputLength];
300 in.getBytes(in.readerIndex(), array);
301 z.next_in = array;
302 z.next_in_index = 0;
303 }
304 int oldNextInIndex = z.next_in_index;
305
306
307 int maxOutputLength = (int) Math.ceil(inputLength * 1.001) + 12 + wrapperOverhead;
308 out.ensureWritable(maxOutputLength);
309 z.avail_out = maxOutputLength;
310 z.next_out = out.array();
311 z.next_out_index = out.arrayOffset() + out.writerIndex();
312 int oldNextOutIndex = z.next_out_index;
313
314
315 int resultCode;
316 try {
317 resultCode = z.deflate(JZlib.Z_SYNC_FLUSH);
318 } finally {
319 in.skipBytes(z.next_in_index - oldNextInIndex);
320 }
321
322 if (resultCode != JZlib.Z_OK) {
323 ZlibUtil.fail(z, "compression failure", resultCode);
324 }
325
326 int outputLength = z.next_out_index - oldNextOutIndex;
327 if (outputLength > 0) {
328 out.writerIndex(out.writerIndex() + outputLength);
329 }
330 } finally {
331
332
333
334
335 z.next_in = null;
336 z.next_out = null;
337 }
338 }
339
340 @Override
341 public void close(
342 final ChannelHandlerContext ctx,
343 final ChannelPromise promise) {
344 ChannelFuture f = finishEncode(ctx, ctx.newPromise());
345 f.addListener(new ChannelFutureListener() {
346 @Override
347 public void operationComplete(ChannelFuture f) throws Exception {
348 ctx.close(promise);
349 }
350 });
351
352 if (!f.isDone()) {
353
354 ctx.executor().schedule(new Runnable() {
355 @Override
356 public void run() {
357 ctx.close(promise);
358 }
359 }, 10, TimeUnit.SECONDS);
360 }
361 }
362
363 private ChannelFuture finishEncode(ChannelHandlerContext ctx, ChannelPromise promise) {
364 if (finished) {
365 promise.setSuccess();
366 return promise;
367 }
368 finished = true;
369
370 ByteBuf footer;
371 try {
372
373 z.next_in = EmptyArrays.EMPTY_BYTES;
374 z.next_in_index = 0;
375 z.avail_in = 0;
376
377
378 byte[] out = new byte[32];
379 z.next_out = out;
380 z.next_out_index = 0;
381 z.avail_out = out.length;
382
383
384 int resultCode = z.deflate(JZlib.Z_FINISH);
385 if (resultCode != JZlib.Z_OK && resultCode != JZlib.Z_STREAM_END) {
386 promise.setFailure(ZlibUtil.deflaterException(z, "compression failure", resultCode));
387 return promise;
388 } else if (z.next_out_index != 0) {
389 footer = Unpooled.wrappedBuffer(out, 0, z.next_out_index);
390 } else {
391 footer = Unpooled.EMPTY_BUFFER;
392 }
393 } finally {
394 z.deflateEnd();
395
396
397
398
399
400 z.next_in = null;
401 z.next_out = null;
402 }
403 return ctx.writeAndFlush(footer, promise);
404 }
405
406 @Override
407 public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
408 this.ctx = ctx;
409 }
410 }