View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.handler.codec.compression;
17  
18  import java.util.concurrent.atomic.AtomicBoolean;
19  
20  import org.jboss.netty.buffer.ChannelBuffer;
21  import org.jboss.netty.buffer.ChannelBuffers;
22  import org.jboss.netty.channel.Channel;
23  import org.jboss.netty.channel.ChannelEvent;
24  import org.jboss.netty.channel.ChannelFuture;
25  import org.jboss.netty.channel.ChannelFutureListener;
26  import org.jboss.netty.channel.ChannelHandlerContext;
27  import org.jboss.netty.channel.ChannelStateEvent;
28  import org.jboss.netty.channel.Channels;
29  import org.jboss.netty.channel.LifeCycleAwareChannelHandler;
30  import org.jboss.netty.handler.codec.oneone.OneToOneStrictEncoder;
31  import org.jboss.netty.util.internal.jzlib.JZlib;
32  import org.jboss.netty.util.internal.jzlib.ZStream;
33  
34  
35  /**
36   * Compresses a {@link ChannelBuffer} using the deflate algorithm.
37   * @apiviz.landmark
38   * @apiviz.has org.jboss.netty.handler.codec.compression.ZlibWrapper
39   */
40  public class ZlibEncoder extends OneToOneStrictEncoder implements LifeCycleAwareChannelHandler {
41  
42      private static final byte[] EMPTY_ARRAY = new byte[0];
43  
44      private final ZStream z = new ZStream();
45      private final AtomicBoolean finished = new AtomicBoolean();
46      private volatile ChannelHandlerContext ctx;
47  
48      /**
49       * Creates a new zlib encoder with the default compression level ({@code 6}),
50       * default window bits ({@code 15}), default memory level ({@code 8}),
51       * and the default wrapper ({@link ZlibWrapper#ZLIB}).
52       *
53       * @throws CompressionException if failed to initialize zlib
54       */
55      public ZlibEncoder() {
56          this(6);
57      }
58  
59      /**
60       * Creates a new zlib encoder with the specified {@code compressionLevel},
61       * default window bits ({@code 15}), default memory level ({@code 8}),
62       * and the default wrapper ({@link ZlibWrapper#ZLIB}).
63       *
64       * @param compressionLevel
65       *        {@code 1} yields the fastest compression and {@code 9} yields the
66       *        best compression.  {@code 0} means no compression.  The default
67       *        compression level is {@code 6}.
68       *
69       * @throws CompressionException if failed to initialize zlib
70       */
71      public ZlibEncoder(int compressionLevel) {
72          this(ZlibWrapper.ZLIB, compressionLevel);
73      }
74  
75      /**
76       * Creates a new zlib encoder with the default compression level ({@code 6}),
77       * default window bits ({@code 15}), default memory level ({@code 8}),
78       * and the specified wrapper.
79       *
80       * @throws CompressionException if failed to initialize zlib
81       */
82      public ZlibEncoder(ZlibWrapper wrapper) {
83          this(wrapper, 6);
84      }
85  
86      /**
87       * Creates a new zlib encoder with the specified {@code compressionLevel},
88       * default window bits ({@code 15}), default memory level ({@code 8}),
89       * and the specified wrapper.
90       *
91       * @param compressionLevel
92       *        {@code 1} yields the fastest compression and {@code 9} yields the
93       *        best compression.  {@code 0} means no compression.  The default
94       *        compression level is {@code 6}.
95       *
96       * @throws CompressionException if failed to initialize zlib
97       */
98      public ZlibEncoder(ZlibWrapper wrapper, int compressionLevel) {
99          this(wrapper, compressionLevel, 15, 8);
100     }
101 
102     /**
103      * Creates a new zlib encoder with the specified {@code compressionLevel},
104      * the specified {@code windowBits}, the specified {@code memLevel}, and
105      * the specified wrapper.
106      *
107      * @param compressionLevel
108      *        {@code 1} yields the fastest compression and {@code 9} yields the
109      *        best compression.  {@code 0} means no compression.  The default
110      *        compression level is {@code 6}.
111      * @param windowBits
112      *        The base two logarithm of the size of the history buffer.  The
113      *        value should be in the range {@code 9} to {@code 15} inclusive.
114      *        Larger values result in better compression at the expense of
115      *        memory usage.  The default value is {@code 15}.
116      * @param memLevel
117      *        How much memory should be allocated for the internal compression
118      *        state.  {@code 1} uses minimum memory and {@code 9} uses maximum
119      *        memory.  Larger values result in better and faster compression
120      *        at the expense of memory usage.  The default value is {@code 8}
121      *
122      * @throws CompressionException if failed to initialize zlib
123      */
124     public ZlibEncoder(ZlibWrapper wrapper, int compressionLevel, int windowBits, int memLevel) {
125         if (compressionLevel < 0 || compressionLevel > 9) {
126             throw new IllegalArgumentException(
127                     "compressionLevel: " + compressionLevel + " (expected: 0-9)");
128         }
129         if (windowBits < 9 || windowBits > 15) {
130             throw new IllegalArgumentException(
131                     "windowBits: " + windowBits + " (expected: 9-15)");
132         }
133         if (memLevel < 1 || memLevel > 9) {
134             throw new IllegalArgumentException(
135                     "memLevel: " + memLevel + " (expected: 1-9)");
136         }
137         if (wrapper == null) {
138             throw new NullPointerException("wrapper");
139         }
140         if (wrapper == ZlibWrapper.ZLIB_OR_NONE) {
141             throw new IllegalArgumentException(
142                     "wrapper '" + ZlibWrapper.ZLIB_OR_NONE + "' is not " +
143                     "allowed for compression.");
144         }
145 
146         synchronized (z) {
147             int resultCode = z.deflateInit(compressionLevel, windowBits, memLevel,
148                     ZlibUtil.convertWrapperType(wrapper));
149             if (resultCode != JZlib.Z_OK) {
150                 ZlibUtil.fail(z, "initialization failure", resultCode);
151             }
152         }
153     }
154 
155     /**
156      * Creates a new zlib encoder with the default compression level ({@code 6}),
157      * default window bits ({@code 15}), default memory level ({@code 8}),
158      * and the specified preset dictionary.  The wrapper is always
159      * {@link ZlibWrapper#ZLIB} because it is the only format that supports
160      * the preset dictionary.
161      *
162      * @param dictionary  the preset dictionary
163      *
164      * @throws CompressionException if failed to initialize zlib
165      */
166     public ZlibEncoder(byte[] dictionary) {
167         this(6, dictionary);
168     }
169 
170     /**
171      * Creates a new zlib encoder with the specified {@code compressionLevel},
172      * default window bits ({@code 15}), default memory level ({@code 8}),
173      * and the specified preset dictionary.  The wrapper is always
174      * {@link ZlibWrapper#ZLIB} because it is the only format that supports
175      * the preset dictionary.
176      *
177      * @param compressionLevel
178      *        {@code 1} yields the fastest compression and {@code 9} yields the
179      *        best compression.  {@code 0} means no compression.  The default
180      *        compression level is {@code 6}.
181      * @param dictionary  the preset dictionary
182      *
183      * @throws CompressionException if failed to initialize zlib
184      */
185     public ZlibEncoder(int compressionLevel, byte[] dictionary) {
186         this(compressionLevel, 15, 8, dictionary);
187     }
188 
189     /**
190      * Creates a new zlib encoder with the specified {@code compressionLevel},
191      * the specified {@code windowBits}, the specified {@code memLevel},
192      * and the specified preset dictionary.  The wrapper is always
193      * {@link ZlibWrapper#ZLIB} because it is the only format that supports
194      * the preset dictionary.
195      *
196      * @param compressionLevel
197      *        {@code 1} yields the fastest compression and {@code 9} yields the
198      *        best compression.  {@code 0} means no compression.  The default
199      *        compression level is {@code 6}.
200      * @param windowBits
201      *        The base two logarithm of the size of the history buffer.  The
202      *        value should be in the range {@code 9} to {@code 15} inclusive.
203      *        Larger values result in better compression at the expense of
204      *        memory usage.  The default value is {@code 15}.
205      * @param memLevel
206      *        How much memory should be allocated for the internal compression
207      *        state.  {@code 1} uses minimum memory and {@code 9} uses maximum
208      *        memory.  Larger values result in better and faster compression
209      *        at the expense of memory usage.  The default value is {@code 8}
210      * @param dictionary  the preset dictionary
211      *
212      * @throws CompressionException if failed to initialize zlib
213      */
214     public ZlibEncoder(int compressionLevel, int windowBits, int memLevel, byte[] dictionary) {
215         if (compressionLevel < 0 || compressionLevel > 9) {
216             throw new IllegalArgumentException(
217                     "compressionLevel: " + compressionLevel + " (expected: 0-9)");
218         }
219         if (windowBits < 9 || windowBits > 15) {
220             throw new IllegalArgumentException(
221                     "windowBits: " + windowBits + " (expected: 9-15)");
222         }
223         if (memLevel < 1 || memLevel > 9) {
224             throw new IllegalArgumentException(
225                     "memLevel: " + memLevel + " (expected: 1-9)");
226         }
227         if (dictionary == null) {
228             throw new NullPointerException("dictionary");
229         }
230 
231         synchronized (z) {
232             int resultCode;
233             resultCode = z.deflateInit(compressionLevel, windowBits, memLevel,
234                     JZlib.W_ZLIB); // Default: ZLIB format
235             if (resultCode != JZlib.Z_OK) {
236                 ZlibUtil.fail(z, "initialization failure", resultCode);
237             } else {
238                 resultCode = z.deflateSetDictionary(dictionary, dictionary.length);
239                 if (resultCode != JZlib.Z_OK) {
240                     ZlibUtil.fail(z, "failed to set the dictionary", resultCode);
241                 }
242             }
243         }
244     }
245 
246     public ChannelFuture close() {
247         ChannelHandlerContext ctx = this.ctx;
248         if (ctx == null) {
249             throw new IllegalStateException("not added to a pipeline");
250         }
251         return finishEncode(ctx, null);
252     }
253 
254     public boolean isClosed() {
255         return finished.get();
256     }
257 
258     @Override
259     protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception {
260         if (!(msg instanceof ChannelBuffer) || finished.get()) {
261             return msg;
262         }
263 
264         ChannelBuffer result;
265         synchronized (z) {
266             try {
267                 // Configure input.
268                 ChannelBuffer uncompressed = (ChannelBuffer) msg;
269                 byte[] in = new byte[uncompressed.readableBytes()];
270                 uncompressed.readBytes(in);
271                 z.next_in = in;
272                 z.next_in_index = 0;
273                 z.avail_in = in.length;
274 
275                 // Configure output.
276                 byte[] out = new byte[(int) Math.ceil(in.length * 1.001) + 12];
277                 z.next_out = out;
278                 z.next_out_index = 0;
279                 z.avail_out = out.length;
280 
281                 // Note that Z_PARTIAL_FLUSH has been deprecated.
282                 int resultCode = z.deflate(JZlib.Z_SYNC_FLUSH);
283                 if (resultCode != JZlib.Z_OK) {
284                     ZlibUtil.fail(z, "compression failure", resultCode);
285                 }
286 
287                 if (z.next_out_index != 0) {
288                     result = ctx.getChannel().getConfig().getBufferFactory().getBuffer(
289                             uncompressed.order(), out, 0, z.next_out_index);
290                 } else {
291                     result = ChannelBuffers.EMPTY_BUFFER;
292                 }
293             } finally {
294                 // Deference the external references explicitly to tell the VM that
295                 // the allocated byte arrays are temporary so that the call stack
296                 // can be utilized.
297                 // I'm not sure if the modern VMs do this optimization though.
298                 z.next_in = null;
299                 z.next_out = null;
300             }
301         }
302 
303         return result;
304     }
305 
306     @Override
307     public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent evt)
308             throws Exception {
309         if (evt instanceof ChannelStateEvent) {
310             ChannelStateEvent e = (ChannelStateEvent) evt;
311             switch (e.getState()) {
312             case OPEN:
313             case CONNECTED:
314             case BOUND:
315                 if (Boolean.FALSE.equals(e.getValue()) || e.getValue() == null) {
316                     finishEncode(ctx, evt);
317                     return;
318                 }
319             }
320         }
321 
322         super.handleDownstream(ctx, evt);
323     }
324 
325     private ChannelFuture finishEncode(final ChannelHandlerContext ctx, final ChannelEvent evt) {
326         if (!finished.compareAndSet(false, true)) {
327             if (evt != null) {
328                 ctx.sendDownstream(evt);
329             }
330             return Channels.succeededFuture(ctx.getChannel());
331         }
332 
333         ChannelBuffer footer;
334         ChannelFuture future;
335         synchronized (z) {
336             try {
337                 // Configure input.
338                 z.next_in = EMPTY_ARRAY;
339                 z.next_in_index = 0;
340                 z.avail_in = 0;
341 
342                 // Configure output.
343                 byte[] out = new byte[32]; // room for ADLER32 + ZLIB / CRC32 + GZIP header
344                 z.next_out = out;
345                 z.next_out_index = 0;
346                 z.avail_out = out.length;
347 
348                 // Write the ADLER32 checksum (stream footer).
349                 int resultCode = z.deflate(JZlib.Z_FINISH);
350                 if (resultCode != JZlib.Z_OK && resultCode != JZlib.Z_STREAM_END) {
351                     future = Channels.failedFuture(
352                             ctx.getChannel(),
353                             ZlibUtil.exception(z, "compression failure", resultCode));
354                     footer = null;
355                 } else if (z.next_out_index != 0) {
356                     future = Channels.future(ctx.getChannel());
357                     footer =
358                         ctx.getChannel().getConfig().getBufferFactory().getBuffer(
359                                 out, 0, z.next_out_index);
360                 } else {
361                     // Note that we should never use a SucceededChannelFuture
362                     // here just in case any downstream handler or a sink wants
363                     // to notify a write error.
364                     future = Channels.future(ctx.getChannel());
365                     footer = ChannelBuffers.EMPTY_BUFFER;
366                 }
367             } finally {
368                 z.deflateEnd();
369 
370                 // Deference the external references explicitly to tell the VM that
371                 // the allocated byte arrays are temporary so that the call stack
372                 // can be utilized.
373                 // I'm not sure if the modern VMs do this optimization though.
374                 z.next_in = null;
375                 z.next_out = null;
376             }
377         }
378 
379         if (footer != null) {
380             Channels.write(ctx, future, footer);
381         }
382 
383         if (evt != null) {
384             future.addListener(new ChannelFutureListener() {
385                 public void operationComplete(ChannelFuture future) throws Exception {
386                     ctx.sendDownstream(evt);
387                 }
388             });
389         }
390 
391         return future;
392     }
393 
394     public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
395         this.ctx = ctx;
396     }
397 
398     public void afterAdd(ChannelHandlerContext ctx) throws Exception {
399         // Unused
400     }
401 
402     public void beforeRemove(ChannelHandlerContext ctx) throws Exception {
403         // Unused
404     }
405 
406     public void afterRemove(ChannelHandlerContext ctx) throws Exception {
407         // Unused
408     }
409 }