1 /*
2 * Copyright 2012 The Netty Project
3 *
4 * The Netty Project licenses this file to you under the Apache License,
5 * version 2.0 (the "License"); you may not use this file except in compliance
6 * with the License. You may obtain a copy of the License at:
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations
14 * under the License.
15 */
16 package org.jboss.netty.handler.codec.compression;
17
18 import org.jboss.netty.buffer.ChannelBuffer;
19 import org.jboss.netty.buffer.ChannelBuffers;
20 import org.jboss.netty.channel.Channel;
21 import org.jboss.netty.channel.ChannelEvent;
22 import org.jboss.netty.channel.ChannelFuture;
23 import org.jboss.netty.channel.ChannelFutureListener;
24 import org.jboss.netty.channel.ChannelHandlerContext;
25 import org.jboss.netty.channel.ChannelStateEvent;
26 import org.jboss.netty.channel.Channels;
27 import org.jboss.netty.channel.LifeCycleAwareChannelHandler;
28 import org.jboss.netty.handler.codec.oneone.OneToOneStrictEncoder;
29 import org.jboss.netty.util.internal.jzlib.JZlib;
30 import org.jboss.netty.util.internal.jzlib.ZStream;
31
32 import java.util.concurrent.atomic.AtomicBoolean;
33
34
35 /**
36 * Compresses a {@link ChannelBuffer} using the deflate algorithm.
37 * @apiviz.landmark
38 * @apiviz.has org.jboss.netty.handler.codec.compression.ZlibWrapper
39 */
40 public class ZlibEncoder extends OneToOneStrictEncoder implements LifeCycleAwareChannelHandler {
41
42 private static final byte[] EMPTY_ARRAY = new byte[0];
43
44 private final int wrapperOverhead;
45 private final ZStream z = new ZStream();
46 private final AtomicBoolean finished = new AtomicBoolean();
47 private volatile ChannelHandlerContext ctx;
48
49 /**
50 * Creates a new zlib encoder with the default compression level ({@code 6}),
51 * default window bits ({@code 15}), default memory level ({@code 8}),
52 * and the default wrapper ({@link ZlibWrapper#ZLIB}).
53 *
54 * @throws CompressionException if failed to initialize zlib
55 */
56 public ZlibEncoder() {
57 this(6);
58 }
59
60 /**
61 * Creates a new zlib encoder with the specified {@code compressionLevel},
62 * default window bits ({@code 15}), default memory level ({@code 8}),
63 * and the default wrapper ({@link ZlibWrapper#ZLIB}).
64 *
65 * @param compressionLevel
66 * {@code 1} yields the fastest compression and {@code 9} yields the
67 * best compression. {@code 0} means no compression. The default
68 * compression level is {@code 6}.
69 *
70 * @throws CompressionException if failed to initialize zlib
71 */
72 public ZlibEncoder(int compressionLevel) {
73 this(ZlibWrapper.ZLIB, compressionLevel);
74 }
75
76 /**
77 * Creates a new zlib encoder with the default compression level ({@code 6}),
78 * default window bits ({@code 15}), default memory level ({@code 8}),
79 * and the specified wrapper.
80 *
81 * @throws CompressionException if failed to initialize zlib
82 */
83 public ZlibEncoder(ZlibWrapper wrapper) {
84 this(wrapper, 6);
85 }
86
87 /**
88 * Creates a new zlib encoder with the specified {@code compressionLevel},
89 * default window bits ({@code 15}), default memory level ({@code 8}),
90 * and the specified wrapper.
91 *
92 * @param compressionLevel
93 * {@code 1} yields the fastest compression and {@code 9} yields the
94 * best compression. {@code 0} means no compression. The default
95 * compression level is {@code 6}.
96 *
97 * @throws CompressionException if failed to initialize zlib
98 */
99 public ZlibEncoder(ZlibWrapper wrapper, int compressionLevel) {
100 this(wrapper, compressionLevel, 15, 8);
101 }
102
103 /**
104 * Creates a new zlib encoder with the specified {@code compressionLevel},
105 * the specified {@code windowBits}, the specified {@code memLevel}, and
106 * the specified wrapper.
107 *
108 * @param compressionLevel
109 * {@code 1} yields the fastest compression and {@code 9} yields the
110 * best compression. {@code 0} means no compression. The default
111 * compression level is {@code 6}.
112 * @param windowBits
113 * The base two logarithm of the size of the history buffer. The
114 * value should be in the range {@code 9} to {@code 15} inclusive.
115 * Larger values result in better compression at the expense of
116 * memory usage. The default value is {@code 15}.
117 * @param memLevel
118 * How much memory should be allocated for the internal compression
119 * state. {@code 1} uses minimum memory and {@code 9} uses maximum
120 * memory. Larger values result in better and faster compression
121 * at the expense of memory usage. The default value is {@code 8}
122 *
123 * @throws CompressionException if failed to initialize zlib
124 */
125 public ZlibEncoder(ZlibWrapper wrapper, int compressionLevel, int windowBits, int memLevel) {
126 if (compressionLevel < 0 || compressionLevel > 9) {
127 throw new IllegalArgumentException(
128 "compressionLevel: " + compressionLevel + " (expected: 0-9)");
129 }
130 if (windowBits < 9 || windowBits > 15) {
131 throw new IllegalArgumentException(
132 "windowBits: " + windowBits + " (expected: 9-15)");
133 }
134 if (memLevel < 1 || memLevel > 9) {
135 throw new IllegalArgumentException(
136 "memLevel: " + memLevel + " (expected: 1-9)");
137 }
138 if (wrapper == null) {
139 throw new NullPointerException("wrapper");
140 }
141 if (wrapper == ZlibWrapper.ZLIB_OR_NONE) {
142 throw new IllegalArgumentException(
143 "wrapper '" + ZlibWrapper.ZLIB_OR_NONE + "' is not " +
144 "allowed for compression.");
145 }
146
147 wrapperOverhead = ZlibUtil.wrapperOverhead(wrapper);
148
149 synchronized (z) {
150 int resultCode = z.deflateInit(compressionLevel, windowBits, memLevel,
151 ZlibUtil.convertWrapperType(wrapper));
152 if (resultCode != JZlib.Z_OK) {
153 ZlibUtil.fail(z, "initialization failure", resultCode);
154 }
155 }
156 }
157
158 /**
159 * Creates a new zlib encoder with the default compression level ({@code 6}),
160 * default window bits ({@code 15}), default memory level ({@code 8}),
161 * and the specified preset dictionary. The wrapper is always
162 * {@link ZlibWrapper#ZLIB} because it is the only format that supports
163 * the preset dictionary.
164 *
165 * @param dictionary the preset dictionary
166 *
167 * @throws CompressionException if failed to initialize zlib
168 */
169 public ZlibEncoder(byte[] dictionary) {
170 this(6, dictionary);
171 }
172
173 /**
174 * Creates a new zlib encoder with the specified {@code compressionLevel},
175 * default window bits ({@code 15}), default memory level ({@code 8}),
176 * and the specified preset dictionary. The wrapper is always
177 * {@link ZlibWrapper#ZLIB} because it is the only format that supports
178 * the preset dictionary.
179 *
180 * @param compressionLevel
181 * {@code 1} yields the fastest compression and {@code 9} yields the
182 * best compression. {@code 0} means no compression. The default
183 * compression level is {@code 6}.
184 * @param dictionary the preset dictionary
185 *
186 * @throws CompressionException if failed to initialize zlib
187 */
188 public ZlibEncoder(int compressionLevel, byte[] dictionary) {
189 this(compressionLevel, 15, 8, dictionary);
190 }
191
192 /**
193 * Creates a new zlib encoder with the specified {@code compressionLevel},
194 * the specified {@code windowBits}, the specified {@code memLevel},
195 * and the specified preset dictionary. The wrapper is always
196 * {@link ZlibWrapper#ZLIB} because it is the only format that supports
197 * the preset dictionary.
198 *
199 * @param compressionLevel
200 * {@code 1} yields the fastest compression and {@code 9} yields the
201 * best compression. {@code 0} means no compression. The default
202 * compression level is {@code 6}.
203 * @param windowBits
204 * The base two logarithm of the size of the history buffer. The
205 * value should be in the range {@code 9} to {@code 15} inclusive.
206 * Larger values result in better compression at the expense of
207 * memory usage. The default value is {@code 15}.
208 * @param memLevel
209 * How much memory should be allocated for the internal compression
210 * state. {@code 1} uses minimum memory and {@code 9} uses maximum
211 * memory. Larger values result in better and faster compression
212 * at the expense of memory usage. The default value is {@code 8}
213 * @param dictionary the preset dictionary
214 *
215 * @throws CompressionException if failed to initialize zlib
216 */
217 public ZlibEncoder(int compressionLevel, int windowBits, int memLevel, byte[] dictionary) {
218 if (compressionLevel < 0 || compressionLevel > 9) {
219 throw new IllegalArgumentException(
220 "compressionLevel: " + compressionLevel + " (expected: 0-9)");
221 }
222 if (windowBits < 9 || windowBits > 15) {
223 throw new IllegalArgumentException(
224 "windowBits: " + windowBits + " (expected: 9-15)");
225 }
226 if (memLevel < 1 || memLevel > 9) {
227 throw new IllegalArgumentException(
228 "memLevel: " + memLevel + " (expected: 1-9)");
229 }
230 if (dictionary == null) {
231 throw new NullPointerException("dictionary");
232 }
233
234 wrapperOverhead = ZlibUtil.wrapperOverhead(ZlibWrapper.ZLIB);
235
236 synchronized (z) {
237 int resultCode;
238 resultCode = z.deflateInit(compressionLevel, windowBits, memLevel,
239 JZlib.W_ZLIB); // Default: ZLIB format
240 if (resultCode != JZlib.Z_OK) {
241 ZlibUtil.fail(z, "initialization failure", resultCode);
242 } else {
243 resultCode = z.deflateSetDictionary(dictionary, dictionary.length);
244 if (resultCode != JZlib.Z_OK) {
245 ZlibUtil.fail(z, "failed to set the dictionary", resultCode);
246 }
247 }
248 }
249 }
250
251 public ChannelFuture close() {
252 ChannelHandlerContext ctx = this.ctx;
253 if (ctx == null) {
254 throw new IllegalStateException("not added to a pipeline");
255 }
256 return finishEncode(ctx, null);
257 }
258
259 public boolean isClosed() {
260 return finished.get();
261 }
262
263 @Override
264 protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception {
265 if (!(msg instanceof ChannelBuffer) || finished.get()) {
266 return msg;
267 }
268
269 final ChannelBuffer result;
270 synchronized (z) {
271 try {
272 // Configure input.
273 final ChannelBuffer uncompressed = (ChannelBuffer) msg;
274 final int uncompressedLen = uncompressed.readableBytes();
275 if (uncompressedLen == 0) {
276 return uncompressed;
277 }
278
279 final byte[] in = new byte[uncompressedLen];
280 uncompressed.readBytes(in);
281 z.next_in = in;
282 z.next_in_index = 0;
283 z.avail_in = uncompressedLen;
284
285 // Configure output.
286 final byte[] out = new byte[(int) Math.ceil(uncompressedLen * 1.001) + 12 + wrapperOverhead];
287 z.next_out = out;
288 z.next_out_index = 0;
289 z.avail_out = out.length;
290
291 // Note that Z_PARTIAL_FLUSH has been deprecated.
292 final int resultCode = z.deflate(JZlib.Z_SYNC_FLUSH);
293 if (resultCode != JZlib.Z_OK) {
294 ZlibUtil.fail(z, "compression failure", resultCode);
295 }
296
297 if (z.next_out_index != 0) {
298 result = ctx.getChannel().getConfig().getBufferFactory().getBuffer(
299 uncompressed.order(), out, 0, z.next_out_index);
300 } else {
301 result = ChannelBuffers.EMPTY_BUFFER;
302 }
303 } finally {
304 // Deference the external references explicitly to tell the VM that
305 // the allocated byte arrays are temporary so that the call stack
306 // can be utilized.
307 // I'm not sure if the modern VMs do this optimization though.
308 z.next_in = null;
309 z.next_out = null;
310 }
311 }
312
313 return result;
314 }
315
316 @Override
317 public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent evt)
318 throws Exception {
319 if (evt instanceof ChannelStateEvent) {
320 ChannelStateEvent e = (ChannelStateEvent) evt;
321 switch (e.getState()) {
322 case OPEN:
323 case CONNECTED:
324 case BOUND:
325 if (Boolean.FALSE.equals(e.getValue()) || e.getValue() == null) {
326 finishEncode(ctx, evt);
327 return;
328 }
329 }
330 }
331
332 super.handleDownstream(ctx, evt);
333 }
334
335 private ChannelFuture finishEncode(final ChannelHandlerContext ctx, final ChannelEvent evt) {
336 if (!finished.compareAndSet(false, true)) {
337 if (evt != null) {
338 ctx.sendDownstream(evt);
339 }
340 return Channels.succeededFuture(ctx.getChannel());
341 }
342
343 ChannelBuffer footer;
344 ChannelFuture future;
345 synchronized (z) {
346 try {
347 // Configure input.
348 z.next_in = EMPTY_ARRAY;
349 z.next_in_index = 0;
350 z.avail_in = 0;
351
352 // Configure output.
353 byte[] out = new byte[32]; // room for ADLER32 + ZLIB / CRC32 + GZIP header
354 z.next_out = out;
355 z.next_out_index = 0;
356 z.avail_out = out.length;
357
358 // Write the ADLER32 checksum (stream footer).
359 int resultCode = z.deflate(JZlib.Z_FINISH);
360 if (resultCode != JZlib.Z_OK && resultCode != JZlib.Z_STREAM_END) {
361 future = Channels.failedFuture(
362 ctx.getChannel(),
363 ZlibUtil.exception(z, "compression failure", resultCode));
364 footer = null;
365 } else if (z.next_out_index != 0) {
366 future = Channels.future(ctx.getChannel());
367 footer =
368 ctx.getChannel().getConfig().getBufferFactory().getBuffer(
369 out, 0, z.next_out_index);
370 } else {
371 // Note that we should never use a SucceededChannelFuture
372 // here just in case any downstream handler or a sink wants
373 // to notify a write error.
374 future = Channels.future(ctx.getChannel());
375 footer = ChannelBuffers.EMPTY_BUFFER;
376 }
377 } finally {
378 z.deflateEnd();
379
380 // Deference the external references explicitly to tell the VM that
381 // the allocated byte arrays are temporary so that the call stack
382 // can be utilized.
383 // I'm not sure if the modern VMs do this optimization though.
384 z.next_in = null;
385 z.next_out = null;
386 }
387 }
388
389 if (footer != null) {
390 Channels.write(ctx, future, footer);
391 }
392
393 if (evt != null) {
394 future.addListener(new ChannelFutureListener() {
395 public void operationComplete(ChannelFuture future) throws Exception {
396 ctx.sendDownstream(evt);
397 }
398 });
399 }
400
401 return future;
402 }
403
404 public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
405 this.ctx = ctx;
406 }
407
408 public void afterAdd(ChannelHandlerContext ctx) throws Exception {
409 // Unused
410 }
411
412 public void beforeRemove(ChannelHandlerContext ctx) throws Exception {
413 // Unused
414 }
415
416 public void afterRemove(ChannelHandlerContext ctx) throws Exception {
417 // Unused
418 }
419 }