1 /*
2 * Copyright 2012 The Netty Project
3 *
4 * The Netty Project licenses this file to you under the Apache License,
5 * version 2.0 (the "License"); you may not use this file except in compliance
6 * with the License. You may obtain a copy of the License at:
7 *
8 * https://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations
14 * under the License.
15 */
16 package io.netty.handler.codec.compression;
17
18 import com.jcraft.jzlib.Deflater;
19 import com.jcraft.jzlib.JZlib;
20 import io.netty.buffer.ByteBuf;
21 import io.netty.buffer.Unpooled;
22 import io.netty.channel.ChannelFuture;
23 import io.netty.channel.ChannelFutureListener;
24 import io.netty.channel.ChannelHandlerContext;
25 import io.netty.channel.ChannelPromise;
26 import io.netty.util.concurrent.EventExecutor;
27 import io.netty.util.concurrent.Future;
28 import io.netty.util.concurrent.PromiseNotifier;
29 import io.netty.util.internal.EmptyArrays;
30 import io.netty.util.internal.ObjectUtil;
31
32 import java.util.concurrent.TimeUnit;
33
34 /**
35 * Compresses a {@link ByteBuf} using the deflate algorithm.
36 */
37 public class JZlibEncoder extends ZlibEncoder {
38
39 private final int wrapperOverhead;
40 private final Deflater z = new Deflater();
41 private volatile boolean finished;
42 private volatile ChannelHandlerContext ctx;
43
44 private static final int THREAD_POOL_DELAY_SECONDS = 10;
45
46 /**
47 * Creates a new zlib encoder with the default compression level ({@code 6}),
48 * default window bits ({@code 15}), default memory level ({@code 8}),
49 * and the default wrapper ({@link ZlibWrapper#ZLIB}).
50 *
51 * @throws CompressionException if failed to initialize zlib
52 */
53 public JZlibEncoder() {
54 this(6);
55 }
56
57 /**
58 * Creates a new zlib encoder with the specified {@code compressionLevel},
59 * default window bits ({@code 15}), default memory level ({@code 8}),
60 * and the default wrapper ({@link ZlibWrapper#ZLIB}).
61 *
62 * @param compressionLevel
63 * {@code 1} yields the fastest compression and {@code 9} yields the
64 * best compression. {@code 0} means no compression. The default
65 * compression level is {@code 6}.
66 *
67 * @throws CompressionException if failed to initialize zlib
68 */
69 public JZlibEncoder(int compressionLevel) {
70 this(ZlibWrapper.ZLIB, compressionLevel);
71 }
72
73 /**
74 * Creates a new zlib encoder with the default compression level ({@code 6}),
75 * default window bits ({@code 15}), default memory level ({@code 8}),
76 * and the specified wrapper.
77 *
78 * @throws CompressionException if failed to initialize zlib
79 */
80 public JZlibEncoder(ZlibWrapper wrapper) {
81 this(wrapper, 6);
82 }
83
84 /**
85 * Creates a new zlib encoder with the specified {@code compressionLevel},
86 * default window bits ({@code 15}), default memory level ({@code 8}),
87 * and the specified wrapper.
88 *
89 * @param compressionLevel
90 * {@code 1} yields the fastest compression and {@code 9} yields the
91 * best compression. {@code 0} means no compression. The default
92 * compression level is {@code 6}.
93 *
94 * @throws CompressionException if failed to initialize zlib
95 */
96 public JZlibEncoder(ZlibWrapper wrapper, int compressionLevel) {
97 this(wrapper, compressionLevel, 15, 8);
98 }
99
100 /**
101 * Creates a new zlib encoder with the specified {@code compressionLevel},
102 * the specified {@code windowBits}, the specified {@code memLevel}, and
103 * the specified wrapper.
104 *
105 * @param compressionLevel
106 * {@code 1} yields the fastest compression and {@code 9} yields the
107 * best compression. {@code 0} means no compression. The default
108 * compression level is {@code 6}.
109 * @param windowBits
110 * The base two logarithm of the size of the history buffer. The
111 * value should be in the range {@code 9} to {@code 15} inclusive.
112 * Larger values result in better compression at the expense of
113 * memory usage. The default value is {@code 15}.
114 * @param memLevel
115 * How much memory should be allocated for the internal compression
116 * state. {@code 1} uses minimum memory and {@code 9} uses maximum
117 * memory. Larger values result in better and faster compression
118 * at the expense of memory usage. The default value is {@code 8}
119 *
120 * @throws CompressionException if failed to initialize zlib
121 */
122 public JZlibEncoder(ZlibWrapper wrapper, int compressionLevel, int windowBits, int memLevel) {
123 ObjectUtil.checkInRange(compressionLevel, 0, 9, "compressionLevel");
124 ObjectUtil.checkInRange(windowBits, 9, 15, "windowBits");
125 ObjectUtil.checkInRange(memLevel, 1, 9, "memLevel");
126 ObjectUtil.checkNotNull(wrapper, "wrapper");
127
128 if (wrapper == ZlibWrapper.ZLIB_OR_NONE) {
129 throw new IllegalArgumentException(
130 "wrapper '" + ZlibWrapper.ZLIB_OR_NONE + "' is not " +
131 "allowed for compression.");
132 }
133
134 int resultCode = z.init(
135 compressionLevel, windowBits, memLevel,
136 ZlibUtil.convertWrapperType(wrapper));
137 if (resultCode != JZlib.Z_OK) {
138 ZlibUtil.fail(z, "initialization failure", resultCode);
139 }
140
141 wrapperOverhead = ZlibUtil.wrapperOverhead(wrapper);
142 }
143
144 /**
145 * Creates a new zlib encoder with the default compression level ({@code 6}),
146 * default window bits ({@code 15}), default memory level ({@code 8}),
147 * and the specified preset dictionary. The wrapper is always
148 * {@link ZlibWrapper#ZLIB} because it is the only format that supports
149 * the preset dictionary.
150 *
151 * @param dictionary the preset dictionary
152 *
153 * @throws CompressionException if failed to initialize zlib
154 */
155 public JZlibEncoder(byte[] dictionary) {
156 this(6, dictionary);
157 }
158
159 /**
160 * Creates a new zlib encoder with the specified {@code compressionLevel},
161 * default window bits ({@code 15}), default memory level ({@code 8}),
162 * and the specified preset dictionary. The wrapper is always
163 * {@link ZlibWrapper#ZLIB} because it is the only format that supports
164 * the preset dictionary.
165 *
166 * @param compressionLevel
167 * {@code 1} yields the fastest compression and {@code 9} yields the
168 * best compression. {@code 0} means no compression. The default
169 * compression level is {@code 6}.
170 * @param dictionary the preset dictionary
171 *
172 * @throws CompressionException if failed to initialize zlib
173 */
174 public JZlibEncoder(int compressionLevel, byte[] dictionary) {
175 this(compressionLevel, 15, 8, dictionary);
176 }
177
178 /**
179 * Creates a new zlib encoder with the specified {@code compressionLevel},
180 * the specified {@code windowBits}, the specified {@code memLevel},
181 * and the specified preset dictionary. The wrapper is always
182 * {@link ZlibWrapper#ZLIB} because it is the only format that supports
183 * the preset dictionary.
184 *
185 * @param compressionLevel
186 * {@code 1} yields the fastest compression and {@code 9} yields the
187 * best compression. {@code 0} means no compression. The default
188 * compression level is {@code 6}.
189 * @param windowBits
190 * The base two logarithm of the size of the history buffer. The
191 * value should be in the range {@code 9} to {@code 15} inclusive.
192 * Larger values result in better compression at the expense of
193 * memory usage. The default value is {@code 15}.
194 * @param memLevel
195 * How much memory should be allocated for the internal compression
196 * state. {@code 1} uses minimum memory and {@code 9} uses maximum
197 * memory. Larger values result in better and faster compression
198 * at the expense of memory usage. The default value is {@code 8}
199 * @param dictionary the preset dictionary
200 *
201 * @throws CompressionException if failed to initialize zlib
202 */
203 public JZlibEncoder(int compressionLevel, int windowBits, int memLevel, byte[] dictionary) {
204 ObjectUtil.checkInRange(compressionLevel, 0, 9, "compressionLevel");
205 ObjectUtil.checkInRange(windowBits, 9, 15, "windowBits");
206 ObjectUtil.checkInRange(memLevel, 1, 9, "memLevel");
207 ObjectUtil.checkNotNull(dictionary, "dictionary");
208
209 int resultCode;
210 resultCode = z.deflateInit(
211 compressionLevel, windowBits, memLevel,
212 JZlib.W_ZLIB); // Default: ZLIB format
213 if (resultCode != JZlib.Z_OK) {
214 ZlibUtil.fail(z, "initialization failure", resultCode);
215 } else {
216 resultCode = z.deflateSetDictionary(dictionary, dictionary.length);
217 if (resultCode != JZlib.Z_OK) {
218 ZlibUtil.fail(z, "failed to set the dictionary", resultCode);
219 }
220 }
221
222 wrapperOverhead = ZlibUtil.wrapperOverhead(ZlibWrapper.ZLIB);
223 }
224
225 @Override
226 public ChannelFuture close() {
227 return close(ctx().channel().newPromise());
228 }
229
230 @Override
231 public ChannelFuture close(final ChannelPromise promise) {
232 ChannelHandlerContext ctx = ctx();
233 EventExecutor executor = ctx.executor();
234 if (executor.inEventLoop()) {
235 return finishEncode(ctx, promise);
236 } else {
237 final ChannelPromise p = ctx.newPromise();
238 executor.execute(new Runnable() {
239 @Override
240 public void run() {
241 ChannelFuture f = finishEncode(ctx(), p);
242 PromiseNotifier.cascade(f, promise);
243 }
244 });
245 return p;
246 }
247 }
248
249 private ChannelHandlerContext ctx() {
250 ChannelHandlerContext ctx = this.ctx;
251 if (ctx == null) {
252 throw new IllegalStateException("not added to a pipeline");
253 }
254 return ctx;
255 }
256
257 @Override
258 public boolean isClosed() {
259 return finished;
260 }
261
262 @Override
263 protected void encode(ChannelHandlerContext ctx, ByteBuf in, ByteBuf out) throws Exception {
264 if (finished) {
265 out.writeBytes(in);
266 return;
267 }
268
269 int inputLength = in.readableBytes();
270 if (inputLength == 0) {
271 return;
272 }
273
274 try {
275 // Configure input.
276 boolean inHasArray = in.hasArray();
277 z.avail_in = inputLength;
278 if (inHasArray) {
279 z.next_in = in.array();
280 z.next_in_index = in.arrayOffset() + in.readerIndex();
281 } else {
282 byte[] array = new byte[inputLength];
283 in.getBytes(in.readerIndex(), array);
284 z.next_in = array;
285 z.next_in_index = 0;
286 }
287 int oldNextInIndex = z.next_in_index;
288
289 // Configure output.
290 int maxOutputLength = (int) Math.ceil(inputLength * 1.001) + 12 + wrapperOverhead;
291 out.ensureWritable(maxOutputLength);
292 z.avail_out = maxOutputLength;
293 z.next_out = out.array();
294 z.next_out_index = out.arrayOffset() + out.writerIndex();
295 int oldNextOutIndex = z.next_out_index;
296
297 // Note that Z_PARTIAL_FLUSH has been deprecated.
298 int resultCode;
299 try {
300 resultCode = z.deflate(JZlib.Z_SYNC_FLUSH);
301 } finally {
302 in.skipBytes(z.next_in_index - oldNextInIndex);
303 }
304
305 if (resultCode != JZlib.Z_OK) {
306 ZlibUtil.fail(z, "compression failure", resultCode);
307 }
308
309 int outputLength = z.next_out_index - oldNextOutIndex;
310 if (outputLength > 0) {
311 out.writerIndex(out.writerIndex() + outputLength);
312 }
313 } finally {
314 // Deference the external references explicitly to tell the VM that
315 // the allocated byte arrays are temporary so that the call stack
316 // can be utilized.
317 // I'm not sure if the modern VMs do this optimization though.
318 z.next_in = null;
319 z.next_out = null;
320 }
321 }
322
323 @Override
324 public void close(
325 final ChannelHandlerContext ctx,
326 final ChannelPromise promise) {
327 ChannelFuture f = finishEncode(ctx, ctx.newPromise());
328
329 if (!f.isDone()) {
330 // Ensure the channel is closed even if the write operation completes in time.
331 final Future<?> future = ctx.executor().schedule(new Runnable() {
332 @Override
333 public void run() {
334 if (!promise.isDone()) {
335 ctx.close(promise);
336 }
337 }
338 }, THREAD_POOL_DELAY_SECONDS, TimeUnit.SECONDS);
339
340 f.addListener(new ChannelFutureListener() {
341 @Override
342 public void operationComplete(ChannelFuture f) {
343 // Cancel the scheduled timeout.
344 future.cancel(true);
345 if (!promise.isDone()) {
346 ctx.close(promise);
347 }
348 }
349 });
350 } else {
351 ctx.close(promise);
352 }
353 }
354
355 private ChannelFuture finishEncode(ChannelHandlerContext ctx, ChannelPromise promise) {
356 if (finished) {
357 promise.setSuccess();
358 return promise;
359 }
360 finished = true;
361
362 ByteBuf footer;
363 try {
364 // Configure input.
365 z.next_in = EmptyArrays.EMPTY_BYTES;
366 z.next_in_index = 0;
367 z.avail_in = 0;
368
369 // Configure output.
370 byte[] out = new byte[32]; // room for ADLER32 + ZLIB / CRC32 + GZIP header
371 z.next_out = out;
372 z.next_out_index = 0;
373 z.avail_out = out.length;
374
375 // Write the ADLER32 checksum (stream footer).
376 int resultCode = z.deflate(JZlib.Z_FINISH);
377 if (resultCode != JZlib.Z_OK && resultCode != JZlib.Z_STREAM_END) {
378 promise.setFailure(ZlibUtil.deflaterException(z, "compression failure", resultCode));
379 return promise;
380 } else if (z.next_out_index != 0) {
381 // Suppressed a warning above to be on the safe side
382 // even if z.next_out_index seems to be always 0 here
383 footer = Unpooled.wrappedBuffer(out, 0, z.next_out_index);
384 } else {
385 footer = Unpooled.EMPTY_BUFFER;
386 }
387 } finally {
388 z.deflateEnd();
389
390 // Deference the external references explicitly to tell the VM that
391 // the allocated byte arrays are temporary so that the call stack
392 // can be utilized.
393 // I'm not sure if the modern VMs do this optimization though.
394 z.next_in = null;
395 z.next_out = null;
396 }
397 return ctx.writeAndFlush(footer, promise);
398 }
399
400 @Override
401 public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
402 this.ctx = ctx;
403 }
404 }