1 /*
2 * Copyright 2012 The Netty Project
3 *
4 * The Netty Project licenses this file to you under the Apache License,
5 * version 2.0 (the "License"); you may not use this file except in compliance
6 * with the License. You may obtain a copy of the License at:
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations
14 * under the License.
15 */
16 package org.jboss.netty.handler.codec.compression;
17
18 import java.util.concurrent.atomic.AtomicBoolean;
19
20 import org.jboss.netty.buffer.ChannelBuffer;
21 import org.jboss.netty.buffer.ChannelBuffers;
22 import org.jboss.netty.channel.Channel;
23 import org.jboss.netty.channel.ChannelEvent;
24 import org.jboss.netty.channel.ChannelFuture;
25 import org.jboss.netty.channel.ChannelFutureListener;
26 import org.jboss.netty.channel.ChannelHandlerContext;
27 import org.jboss.netty.channel.ChannelStateEvent;
28 import org.jboss.netty.channel.Channels;
29 import org.jboss.netty.channel.LifeCycleAwareChannelHandler;
30 import org.jboss.netty.handler.codec.oneone.OneToOneStrictEncoder;
31 import org.jboss.netty.util.internal.jzlib.JZlib;
32 import org.jboss.netty.util.internal.jzlib.ZStream;
33
34
35 /**
36 * Compresses a {@link ChannelBuffer} using the deflate algorithm.
37 * @apiviz.landmark
38 * @apiviz.has org.jboss.netty.handler.codec.compression.ZlibWrapper
39 */
40 public class ZlibEncoder extends OneToOneStrictEncoder implements LifeCycleAwareChannelHandler {
41
42 private static final byte[] EMPTY_ARRAY = new byte[0];
43
44 private final ZStream z = new ZStream();
45 private final AtomicBoolean finished = new AtomicBoolean();
46 private volatile ChannelHandlerContext ctx;
47
48 /**
49 * Creates a new zlib encoder with the default compression level ({@code 6}),
50 * default window bits ({@code 15}), default memory level ({@code 8}),
51 * and the default wrapper ({@link ZlibWrapper#ZLIB}).
52 *
53 * @throws CompressionException if failed to initialize zlib
54 */
55 public ZlibEncoder() {
56 this(6);
57 }
58
59 /**
60 * Creates a new zlib encoder with the specified {@code compressionLevel},
61 * default window bits ({@code 15}), default memory level ({@code 8}),
62 * and the default wrapper ({@link ZlibWrapper#ZLIB}).
63 *
64 * @param compressionLevel
65 * {@code 1} yields the fastest compression and {@code 9} yields the
66 * best compression. {@code 0} means no compression. The default
67 * compression level is {@code 6}.
68 *
69 * @throws CompressionException if failed to initialize zlib
70 */
71 public ZlibEncoder(int compressionLevel) {
72 this(ZlibWrapper.ZLIB, compressionLevel);
73 }
74
75 /**
76 * Creates a new zlib encoder with the default compression level ({@code 6}),
77 * default window bits ({@code 15}), default memory level ({@code 8}),
78 * and the specified wrapper.
79 *
80 * @throws CompressionException if failed to initialize zlib
81 */
82 public ZlibEncoder(ZlibWrapper wrapper) {
83 this(wrapper, 6);
84 }
85
86 /**
87 * Creates a new zlib encoder with the specified {@code compressionLevel},
88 * default window bits ({@code 15}), default memory level ({@code 8}),
89 * and the specified wrapper.
90 *
91 * @param compressionLevel
92 * {@code 1} yields the fastest compression and {@code 9} yields the
93 * best compression. {@code 0} means no compression. The default
94 * compression level is {@code 6}.
95 *
96 * @throws CompressionException if failed to initialize zlib
97 */
98 public ZlibEncoder(ZlibWrapper wrapper, int compressionLevel) {
99 this(wrapper, compressionLevel, 15, 8);
100 }
101
102 /**
103 * Creates a new zlib encoder with the specified {@code compressionLevel},
104 * the specified {@code windowBits}, the specified {@code memLevel}, and
105 * the specified wrapper.
106 *
107 * @param compressionLevel
108 * {@code 1} yields the fastest compression and {@code 9} yields the
109 * best compression. {@code 0} means no compression. The default
110 * compression level is {@code 6}.
111 * @param windowBits
112 * The base two logarithm of the size of the history buffer. The
113 * value should be in the range {@code 9} to {@code 15} inclusive.
114 * Larger values result in better compression at the expense of
115 * memory usage. The default value is {@code 15}.
116 * @param memLevel
117 * How much memory should be allocated for the internal compression
118 * state. {@code 1} uses minimum memory and {@code 9} uses maximum
119 * memory. Larger values result in better and faster compression
120 * at the expense of memory usage. The default value is {@code 8}
121 *
122 * @throws CompressionException if failed to initialize zlib
123 */
124 public ZlibEncoder(ZlibWrapper wrapper, int compressionLevel, int windowBits, int memLevel) {
125 if (compressionLevel < 0 || compressionLevel > 9) {
126 throw new IllegalArgumentException(
127 "compressionLevel: " + compressionLevel + " (expected: 0-9)");
128 }
129 if (windowBits < 9 || windowBits > 15) {
130 throw new IllegalArgumentException(
131 "windowBits: " + windowBits + " (expected: 9-15)");
132 }
133 if (memLevel < 1 || memLevel > 9) {
134 throw new IllegalArgumentException(
135 "memLevel: " + memLevel + " (expected: 1-9)");
136 }
137 if (wrapper == null) {
138 throw new NullPointerException("wrapper");
139 }
140 if (wrapper == ZlibWrapper.ZLIB_OR_NONE) {
141 throw new IllegalArgumentException(
142 "wrapper '" + ZlibWrapper.ZLIB_OR_NONE + "' is not " +
143 "allowed for compression.");
144 }
145
146 synchronized (z) {
147 int resultCode = z.deflateInit(compressionLevel, windowBits, memLevel,
148 ZlibUtil.convertWrapperType(wrapper));
149 if (resultCode != JZlib.Z_OK) {
150 ZlibUtil.fail(z, "initialization failure", resultCode);
151 }
152 }
153 }
154
155 /**
156 * Creates a new zlib encoder with the default compression level ({@code 6}),
157 * default window bits ({@code 15}), default memory level ({@code 8}),
158 * and the specified preset dictionary. The wrapper is always
159 * {@link ZlibWrapper#ZLIB} because it is the only format that supports
160 * the preset dictionary.
161 *
162 * @param dictionary the preset dictionary
163 *
164 * @throws CompressionException if failed to initialize zlib
165 */
166 public ZlibEncoder(byte[] dictionary) {
167 this(6, dictionary);
168 }
169
170 /**
171 * Creates a new zlib encoder with the specified {@code compressionLevel},
172 * default window bits ({@code 15}), default memory level ({@code 8}),
173 * and the specified preset dictionary. The wrapper is always
174 * {@link ZlibWrapper#ZLIB} because it is the only format that supports
175 * the preset dictionary.
176 *
177 * @param compressionLevel
178 * {@code 1} yields the fastest compression and {@code 9} yields the
179 * best compression. {@code 0} means no compression. The default
180 * compression level is {@code 6}.
181 * @param dictionary the preset dictionary
182 *
183 * @throws CompressionException if failed to initialize zlib
184 */
185 public ZlibEncoder(int compressionLevel, byte[] dictionary) {
186 this(compressionLevel, 15, 8, dictionary);
187 }
188
189 /**
190 * Creates a new zlib encoder with the specified {@code compressionLevel},
191 * the specified {@code windowBits}, the specified {@code memLevel},
192 * and the specified preset dictionary. The wrapper is always
193 * {@link ZlibWrapper#ZLIB} because it is the only format that supports
194 * the preset dictionary.
195 *
196 * @param compressionLevel
197 * {@code 1} yields the fastest compression and {@code 9} yields the
198 * best compression. {@code 0} means no compression. The default
199 * compression level is {@code 6}.
200 * @param windowBits
201 * The base two logarithm of the size of the history buffer. The
202 * value should be in the range {@code 9} to {@code 15} inclusive.
203 * Larger values result in better compression at the expense of
204 * memory usage. The default value is {@code 15}.
205 * @param memLevel
206 * How much memory should be allocated for the internal compression
207 * state. {@code 1} uses minimum memory and {@code 9} uses maximum
208 * memory. Larger values result in better and faster compression
209 * at the expense of memory usage. The default value is {@code 8}
210 * @param dictionary the preset dictionary
211 *
212 * @throws CompressionException if failed to initialize zlib
213 */
214 public ZlibEncoder(int compressionLevel, int windowBits, int memLevel, byte[] dictionary) {
215 if (compressionLevel < 0 || compressionLevel > 9) {
216 throw new IllegalArgumentException(
217 "compressionLevel: " + compressionLevel + " (expected: 0-9)");
218 }
219 if (windowBits < 9 || windowBits > 15) {
220 throw new IllegalArgumentException(
221 "windowBits: " + windowBits + " (expected: 9-15)");
222 }
223 if (memLevel < 1 || memLevel > 9) {
224 throw new IllegalArgumentException(
225 "memLevel: " + memLevel + " (expected: 1-9)");
226 }
227 if (dictionary == null) {
228 throw new NullPointerException("dictionary");
229 }
230
231 synchronized (z) {
232 int resultCode;
233 resultCode = z.deflateInit(compressionLevel, windowBits, memLevel,
234 JZlib.W_ZLIB); // Default: ZLIB format
235 if (resultCode != JZlib.Z_OK) {
236 ZlibUtil.fail(z, "initialization failure", resultCode);
237 } else {
238 resultCode = z.deflateSetDictionary(dictionary, dictionary.length);
239 if (resultCode != JZlib.Z_OK) {
240 ZlibUtil.fail(z, "failed to set the dictionary", resultCode);
241 }
242 }
243 }
244 }
245
246 public ChannelFuture close() {
247 ChannelHandlerContext ctx = this.ctx;
248 if (ctx == null) {
249 throw new IllegalStateException("not added to a pipeline");
250 }
251 return finishEncode(ctx, null);
252 }
253
254 public boolean isClosed() {
255 return finished.get();
256 }
257
258 @Override
259 protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception {
260 if (!(msg instanceof ChannelBuffer) || finished.get()) {
261 return msg;
262 }
263
264 ChannelBuffer result;
265 synchronized (z) {
266 try {
267 // Configure input.
268 ChannelBuffer uncompressed = (ChannelBuffer) msg;
269 byte[] in = new byte[uncompressed.readableBytes()];
270 uncompressed.readBytes(in);
271 z.next_in = in;
272 z.next_in_index = 0;
273 z.avail_in = in.length;
274
275 // Configure output.
276 byte[] out = new byte[(int) Math.ceil(in.length * 1.001) + 12];
277 z.next_out = out;
278 z.next_out_index = 0;
279 z.avail_out = out.length;
280
281 // Note that Z_PARTIAL_FLUSH has been deprecated.
282 int resultCode = z.deflate(JZlib.Z_SYNC_FLUSH);
283 if (resultCode != JZlib.Z_OK) {
284 ZlibUtil.fail(z, "compression failure", resultCode);
285 }
286
287 if (z.next_out_index != 0) {
288 result = ctx.getChannel().getConfig().getBufferFactory().getBuffer(
289 uncompressed.order(), out, 0, z.next_out_index);
290 } else {
291 result = ChannelBuffers.EMPTY_BUFFER;
292 }
293 } finally {
294 // Deference the external references explicitly to tell the VM that
295 // the allocated byte arrays are temporary so that the call stack
296 // can be utilized.
297 // I'm not sure if the modern VMs do this optimization though.
298 z.next_in = null;
299 z.next_out = null;
300 }
301 }
302
303 return result;
304 }
305
306 @Override
307 public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent evt)
308 throws Exception {
309 if (evt instanceof ChannelStateEvent) {
310 ChannelStateEvent e = (ChannelStateEvent) evt;
311 switch (e.getState()) {
312 case OPEN:
313 case CONNECTED:
314 case BOUND:
315 if (Boolean.FALSE.equals(e.getValue()) || e.getValue() == null) {
316 finishEncode(ctx, evt);
317 return;
318 }
319 }
320 }
321
322 super.handleDownstream(ctx, evt);
323 }
324
325 private ChannelFuture finishEncode(final ChannelHandlerContext ctx, final ChannelEvent evt) {
326 if (!finished.compareAndSet(false, true)) {
327 if (evt != null) {
328 ctx.sendDownstream(evt);
329 }
330 return Channels.succeededFuture(ctx.getChannel());
331 }
332
333 ChannelBuffer footer;
334 ChannelFuture future;
335 synchronized (z) {
336 try {
337 // Configure input.
338 z.next_in = EMPTY_ARRAY;
339 z.next_in_index = 0;
340 z.avail_in = 0;
341
342 // Configure output.
343 byte[] out = new byte[32]; // room for ADLER32 + ZLIB / CRC32 + GZIP header
344 z.next_out = out;
345 z.next_out_index = 0;
346 z.avail_out = out.length;
347
348 // Write the ADLER32 checksum (stream footer).
349 int resultCode = z.deflate(JZlib.Z_FINISH);
350 if (resultCode != JZlib.Z_OK && resultCode != JZlib.Z_STREAM_END) {
351 future = Channels.failedFuture(
352 ctx.getChannel(),
353 ZlibUtil.exception(z, "compression failure", resultCode));
354 footer = null;
355 } else if (z.next_out_index != 0) {
356 future = Channels.future(ctx.getChannel());
357 footer =
358 ctx.getChannel().getConfig().getBufferFactory().getBuffer(
359 out, 0, z.next_out_index);
360 } else {
361 // Note that we should never use a SucceededChannelFuture
362 // here just in case any downstream handler or a sink wants
363 // to notify a write error.
364 future = Channels.future(ctx.getChannel());
365 footer = ChannelBuffers.EMPTY_BUFFER;
366 }
367 } finally {
368 z.deflateEnd();
369
370 // Deference the external references explicitly to tell the VM that
371 // the allocated byte arrays are temporary so that the call stack
372 // can be utilized.
373 // I'm not sure if the modern VMs do this optimization though.
374 z.next_in = null;
375 z.next_out = null;
376 }
377 }
378
379 if (footer != null) {
380 Channels.write(ctx, future, footer);
381 }
382
383 if (evt != null) {
384 future.addListener(new ChannelFutureListener() {
385 public void operationComplete(ChannelFuture future) throws Exception {
386 ctx.sendDownstream(evt);
387 }
388 });
389 }
390
391 return future;
392 }
393
394 public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
395 this.ctx = ctx;
396 }
397
398 public void afterAdd(ChannelHandlerContext ctx) throws Exception {
399 // Unused
400 }
401
402 public void beforeRemove(ChannelHandlerContext ctx) throws Exception {
403 // Unused
404 }
405
406 public void afterRemove(ChannelHandlerContext ctx) throws Exception {
407 // Unused
408 }
409 }