View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.codec.compression;
17  
18  import com.jcraft.jzlib.Deflater;
19  import com.jcraft.jzlib.JZlib;
20  import io.netty.buffer.ByteBuf;
21  import io.netty.buffer.Unpooled;
22  import io.netty.channel.ChannelFuture;
23  import io.netty.channel.ChannelFutureListener;
24  import io.netty.channel.ChannelHandlerContext;
25  import io.netty.channel.ChannelPromise;
26  import io.netty.util.concurrent.EventExecutor;
27  import io.netty.util.concurrent.Future;
28  import io.netty.util.concurrent.PromiseNotifier;
29  import io.netty.util.internal.EmptyArrays;
30  import io.netty.util.internal.ObjectUtil;
31  
32  import java.util.concurrent.TimeUnit;
33  
34  /**
35   * Compresses a {@link ByteBuf} using the deflate algorithm.
36   */
37  public class JZlibEncoder extends ZlibEncoder {
38  
39      private final int wrapperOverhead;
40      private final Deflater z = new Deflater();
41      private volatile boolean finished;
42      private volatile ChannelHandlerContext ctx;
43  
44      private static final int THREAD_POOL_DELAY_SECONDS = 10;
45  
46      /**
47       * Creates a new zlib encoder with the default compression level ({@code 6}),
48       * default window bits ({@code 15}), default memory level ({@code 8}),
49       * and the default wrapper ({@link ZlibWrapper#ZLIB}).
50       *
51       * @throws CompressionException if failed to initialize zlib
52       */
53      public JZlibEncoder() {
54          this(6);
55      }
56  
57      /**
58       * Creates a new zlib encoder with the specified {@code compressionLevel},
59       * default window bits ({@code 15}), default memory level ({@code 8}),
60       * and the default wrapper ({@link ZlibWrapper#ZLIB}).
61       *
62       * @param compressionLevel
63       *        {@code 1} yields the fastest compression and {@code 9} yields the
64       *        best compression.  {@code 0} means no compression.  The default
65       *        compression level is {@code 6}.
66       *
67       * @throws CompressionException if failed to initialize zlib
68       */
69      public JZlibEncoder(int compressionLevel) {
70          this(ZlibWrapper.ZLIB, compressionLevel);
71      }
72  
73      /**
74       * Creates a new zlib encoder with the default compression level ({@code 6}),
75       * default window bits ({@code 15}), default memory level ({@code 8}),
76       * and the specified wrapper.
77       *
78       * @throws CompressionException if failed to initialize zlib
79       */
80      public JZlibEncoder(ZlibWrapper wrapper) {
81          this(wrapper, 6);
82      }
83  
84      /**
85       * Creates a new zlib encoder with the specified {@code compressionLevel},
86       * default window bits ({@code 15}), default memory level ({@code 8}),
87       * and the specified wrapper.
88       *
89       * @param compressionLevel
90       *        {@code 1} yields the fastest compression and {@code 9} yields the
91       *        best compression.  {@code 0} means no compression.  The default
92       *        compression level is {@code 6}.
93       *
94       * @throws CompressionException if failed to initialize zlib
95       */
96      public JZlibEncoder(ZlibWrapper wrapper, int compressionLevel) {
97          this(wrapper, compressionLevel, 15, 8);
98      }
99  
100     /**
101      * Creates a new zlib encoder with the specified {@code compressionLevel},
102      * the specified {@code windowBits}, the specified {@code memLevel}, and
103      * the specified wrapper.
104      *
105      * @param compressionLevel
106      *        {@code 1} yields the fastest compression and {@code 9} yields the
107      *        best compression.  {@code 0} means no compression.  The default
108      *        compression level is {@code 6}.
109      * @param windowBits
110      *        The base two logarithm of the size of the history buffer.  The
111      *        value should be in the range {@code 9} to {@code 15} inclusive.
112      *        Larger values result in better compression at the expense of
113      *        memory usage.  The default value is {@code 15}.
114      * @param memLevel
115      *        How much memory should be allocated for the internal compression
116      *        state.  {@code 1} uses minimum memory and {@code 9} uses maximum
117      *        memory.  Larger values result in better and faster compression
118      *        at the expense of memory usage.  The default value is {@code 8}
119      *
120      * @throws CompressionException if failed to initialize zlib
121      */
122     public JZlibEncoder(ZlibWrapper wrapper, int compressionLevel, int windowBits, int memLevel) {
123         ObjectUtil.checkInRange(compressionLevel, 0, 9, "compressionLevel");
124         ObjectUtil.checkInRange(windowBits, 9, 15, "windowBits");
125         ObjectUtil.checkInRange(memLevel, 1, 9, "memLevel");
126         ObjectUtil.checkNotNull(wrapper, "wrapper");
127 
128         if (wrapper == ZlibWrapper.ZLIB_OR_NONE) {
129             throw new IllegalArgumentException(
130                     "wrapper '" + ZlibWrapper.ZLIB_OR_NONE + "' is not " +
131                     "allowed for compression.");
132         }
133 
134         int resultCode = z.init(
135                 compressionLevel, windowBits, memLevel,
136                 ZlibUtil.convertWrapperType(wrapper));
137         if (resultCode != JZlib.Z_OK) {
138             ZlibUtil.fail(z, "initialization failure", resultCode);
139         }
140 
141         wrapperOverhead = ZlibUtil.wrapperOverhead(wrapper);
142     }
143 
144     /**
145      * Creates a new zlib encoder with the default compression level ({@code 6}),
146      * default window bits ({@code 15}), default memory level ({@code 8}),
147      * and the specified preset dictionary.  The wrapper is always
148      * {@link ZlibWrapper#ZLIB} because it is the only format that supports
149      * the preset dictionary.
150      *
151      * @param dictionary  the preset dictionary
152      *
153      * @throws CompressionException if failed to initialize zlib
154      */
155     public JZlibEncoder(byte[] dictionary) {
156         this(6, dictionary);
157     }
158 
159     /**
160      * Creates a new zlib encoder with the specified {@code compressionLevel},
161      * default window bits ({@code 15}), default memory level ({@code 8}),
162      * and the specified preset dictionary.  The wrapper is always
163      * {@link ZlibWrapper#ZLIB} because it is the only format that supports
164      * the preset dictionary.
165      *
166      * @param compressionLevel
167      *        {@code 1} yields the fastest compression and {@code 9} yields the
168      *        best compression.  {@code 0} means no compression.  The default
169      *        compression level is {@code 6}.
170      * @param dictionary  the preset dictionary
171      *
172      * @throws CompressionException if failed to initialize zlib
173      */
174     public JZlibEncoder(int compressionLevel, byte[] dictionary) {
175         this(compressionLevel, 15, 8, dictionary);
176     }
177 
178     /**
179      * Creates a new zlib encoder with the specified {@code compressionLevel},
180      * the specified {@code windowBits}, the specified {@code memLevel},
181      * and the specified preset dictionary.  The wrapper is always
182      * {@link ZlibWrapper#ZLIB} because it is the only format that supports
183      * the preset dictionary.
184      *
185      * @param compressionLevel
186      *        {@code 1} yields the fastest compression and {@code 9} yields the
187      *        best compression.  {@code 0} means no compression.  The default
188      *        compression level is {@code 6}.
189      * @param windowBits
190      *        The base two logarithm of the size of the history buffer.  The
191      *        value should be in the range {@code 9} to {@code 15} inclusive.
192      *        Larger values result in better compression at the expense of
193      *        memory usage.  The default value is {@code 15}.
194      * @param memLevel
195      *        How much memory should be allocated for the internal compression
196      *        state.  {@code 1} uses minimum memory and {@code 9} uses maximum
197      *        memory.  Larger values result in better and faster compression
198      *        at the expense of memory usage.  The default value is {@code 8}
199      * @param dictionary  the preset dictionary
200      *
201      * @throws CompressionException if failed to initialize zlib
202      */
203     public JZlibEncoder(int compressionLevel, int windowBits, int memLevel, byte[] dictionary) {
204         ObjectUtil.checkInRange(compressionLevel, 0, 9, "compressionLevel");
205         ObjectUtil.checkInRange(windowBits, 9, 15, "windowBits");
206         ObjectUtil.checkInRange(memLevel, 1, 9, "memLevel");
207         ObjectUtil.checkNotNull(dictionary, "dictionary");
208 
209         int resultCode;
210         resultCode = z.deflateInit(
211                 compressionLevel, windowBits, memLevel,
212                 JZlib.W_ZLIB); // Default: ZLIB format
213         if (resultCode != JZlib.Z_OK) {
214             ZlibUtil.fail(z, "initialization failure", resultCode);
215         } else {
216             resultCode = z.deflateSetDictionary(dictionary, dictionary.length);
217             if (resultCode != JZlib.Z_OK) {
218                 ZlibUtil.fail(z, "failed to set the dictionary", resultCode);
219             }
220         }
221 
222         wrapperOverhead = ZlibUtil.wrapperOverhead(ZlibWrapper.ZLIB);
223     }
224 
225     @Override
226     public ChannelFuture close() {
227         return close(ctx().channel().newPromise());
228     }
229 
230     @Override
231     public ChannelFuture close(final ChannelPromise promise) {
232         ChannelHandlerContext ctx = ctx();
233         EventExecutor executor = ctx.executor();
234         if (executor.inEventLoop()) {
235             return finishEncode(ctx, promise);
236         } else {
237             final ChannelPromise p = ctx.newPromise();
238             executor.execute(new Runnable() {
239                 @Override
240                 public void run() {
241                     ChannelFuture f = finishEncode(ctx(), p);
242                     PromiseNotifier.cascade(f, promise);
243                 }
244             });
245             return p;
246         }
247     }
248 
249     private ChannelHandlerContext ctx() {
250         ChannelHandlerContext ctx = this.ctx;
251         if (ctx == null) {
252             throw new IllegalStateException("not added to a pipeline");
253         }
254         return ctx;
255     }
256 
257     @Override
258     public boolean isClosed() {
259         return finished;
260     }
261 
262     @Override
263     protected void encode(ChannelHandlerContext ctx, ByteBuf in, ByteBuf out) throws Exception {
264         if (finished) {
265             out.writeBytes(in);
266             return;
267         }
268 
269         int inputLength = in.readableBytes();
270         if (inputLength == 0) {
271             return;
272         }
273 
274         try {
275             // Configure input.
276             boolean inHasArray = in.hasArray();
277             z.avail_in = inputLength;
278             if (inHasArray) {
279                 z.next_in = in.array();
280                 z.next_in_index = in.arrayOffset() + in.readerIndex();
281             } else {
282                 byte[] array = new byte[inputLength];
283                 in.getBytes(in.readerIndex(), array);
284                 z.next_in = array;
285                 z.next_in_index = 0;
286             }
287             int oldNextInIndex = z.next_in_index;
288 
289             // Configure output.
290             int maxOutputLength = (int) Math.ceil(inputLength * 1.001) + 12 + wrapperOverhead;
291             out.ensureWritable(maxOutputLength);
292             z.avail_out = maxOutputLength;
293             z.next_out = out.array();
294             z.next_out_index = out.arrayOffset() + out.writerIndex();
295             int oldNextOutIndex = z.next_out_index;
296 
297             // Note that Z_PARTIAL_FLUSH has been deprecated.
298             int resultCode;
299             try {
300                 resultCode = z.deflate(JZlib.Z_SYNC_FLUSH);
301             } finally {
302                 in.skipBytes(z.next_in_index - oldNextInIndex);
303             }
304 
305             if (resultCode != JZlib.Z_OK) {
306                 ZlibUtil.fail(z, "compression failure", resultCode);
307             }
308 
309             int outputLength = z.next_out_index - oldNextOutIndex;
310             if (outputLength > 0) {
311                 out.writerIndex(out.writerIndex() + outputLength);
312             }
313         } finally {
314             // Deference the external references explicitly to tell the VM that
315             // the allocated byte arrays are temporary so that the call stack
316             // can be utilized.
317             // I'm not sure if the modern VMs do this optimization though.
318             z.next_in = null;
319             z.next_out = null;
320         }
321     }
322 
323     @Override
324     public void close(
325             final ChannelHandlerContext ctx,
326             final ChannelPromise promise) {
327         ChannelFuture f = finishEncode(ctx, ctx.newPromise());
328 
329         if (!f.isDone()) {
330             // Ensure the channel is closed even if the write operation completes in time.
331             final Future<?> future = ctx.executor().schedule(new Runnable() {
332                 @Override
333                 public void run() {
334                     if (!promise.isDone()) {
335                         ctx.close(promise);
336                     }
337                 }
338             }, THREAD_POOL_DELAY_SECONDS, TimeUnit.SECONDS);
339 
340             f.addListener(new ChannelFutureListener() {
341                 @Override
342                 public void operationComplete(ChannelFuture f) {
343                     // Cancel the scheduled timeout.
344                     future.cancel(true);
345                     if (!promise.isDone()) {
346                         ctx.close(promise);
347                     }
348                 }
349             });
350         } else {
351             ctx.close(promise);
352         }
353     }
354 
355     private ChannelFuture finishEncode(ChannelHandlerContext ctx, ChannelPromise promise) {
356         if (finished) {
357             promise.setSuccess();
358             return promise;
359         }
360         finished = true;
361 
362         ByteBuf footer;
363         try {
364             // Configure input.
365             z.next_in = EmptyArrays.EMPTY_BYTES;
366             z.next_in_index = 0;
367             z.avail_in = 0;
368 
369             // Configure output.
370             byte[] out = new byte[32]; // room for ADLER32 + ZLIB / CRC32 + GZIP header
371             z.next_out = out;
372             z.next_out_index = 0;
373             z.avail_out = out.length;
374 
375             // Write the ADLER32 checksum (stream footer).
376             int resultCode = z.deflate(JZlib.Z_FINISH);
377             if (resultCode != JZlib.Z_OK && resultCode != JZlib.Z_STREAM_END) {
378                 promise.setFailure(ZlibUtil.deflaterException(z, "compression failure", resultCode));
379                 return promise;
380             } else if (z.next_out_index != 0) {
381                 // Suppressed a warning above to be on the safe side
382                 // even if z.next_out_index seems to be always 0 here
383                 footer = Unpooled.wrappedBuffer(out, 0, z.next_out_index);
384             } else {
385                 footer = Unpooled.EMPTY_BUFFER;
386             }
387         } finally {
388             z.deflateEnd();
389 
390             // Deference the external references explicitly to tell the VM that
391             // the allocated byte arrays are temporary so that the call stack
392             // can be utilized.
393             // I'm not sure if the modern VMs do this optimization though.
394             z.next_in = null;
395             z.next_out = null;
396         }
397         return ctx.writeAndFlush(footer, promise);
398     }
399 
400     @Override
401     public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
402         this.ctx = ctx;
403     }
404 }