View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.codec.compression;
17  
18  import com.jcraft.jzlib.Deflater;
19  import com.jcraft.jzlib.JZlib;
20  import io.netty.buffer.ByteBuf;
21  import io.netty.buffer.Unpooled;
22  import io.netty.channel.ChannelFuture;
23  import io.netty.channel.ChannelFutureListener;
24  import io.netty.channel.ChannelHandlerContext;
25  import io.netty.channel.ChannelPromise;
26  import io.netty.util.concurrent.EventExecutor;
27  import io.netty.util.concurrent.PromiseNotifier;
28  import io.netty.util.internal.EmptyArrays;
29  import io.netty.util.internal.ObjectUtil;
30  
31  import java.util.concurrent.TimeUnit;
32  
33  /**
34   * Compresses a {@link ByteBuf} using the deflate algorithm.
35   */
36  public class JZlibEncoder extends ZlibEncoder {
37  
38      private final int wrapperOverhead;
39      private final Deflater z = new Deflater();
40      private volatile boolean finished;
41      private volatile ChannelHandlerContext ctx;
42  
43      private static final int THREAD_POOL_DELAY_SECONDS = 10;
44  
45      /**
46       * Creates a new zlib encoder with the default compression level ({@code 6}),
47       * default window bits ({@code 15}), default memory level ({@code 8}),
48       * and the default wrapper ({@link ZlibWrapper#ZLIB}).
49       *
50       * @throws CompressionException if failed to initialize zlib
51       */
52      public JZlibEncoder() {
53          this(6);
54      }
55  
56      /**
57       * Creates a new zlib encoder with the specified {@code compressionLevel},
58       * default window bits ({@code 15}), default memory level ({@code 8}),
59       * and the default wrapper ({@link ZlibWrapper#ZLIB}).
60       *
61       * @param compressionLevel
62       *        {@code 1} yields the fastest compression and {@code 9} yields the
63       *        best compression.  {@code 0} means no compression.  The default
64       *        compression level is {@code 6}.
65       *
66       * @throws CompressionException if failed to initialize zlib
67       */
68      public JZlibEncoder(int compressionLevel) {
69          this(ZlibWrapper.ZLIB, compressionLevel);
70      }
71  
72      /**
73       * Creates a new zlib encoder with the default compression level ({@code 6}),
74       * default window bits ({@code 15}), default memory level ({@code 8}),
75       * and the specified wrapper.
76       *
77       * @throws CompressionException if failed to initialize zlib
78       */
79      public JZlibEncoder(ZlibWrapper wrapper) {
80          this(wrapper, 6);
81      }
82  
83      /**
84       * Creates a new zlib encoder with the specified {@code compressionLevel},
85       * default window bits ({@code 15}), default memory level ({@code 8}),
86       * and the specified wrapper.
87       *
88       * @param compressionLevel
89       *        {@code 1} yields the fastest compression and {@code 9} yields the
90       *        best compression.  {@code 0} means no compression.  The default
91       *        compression level is {@code 6}.
92       *
93       * @throws CompressionException if failed to initialize zlib
94       */
95      public JZlibEncoder(ZlibWrapper wrapper, int compressionLevel) {
96          this(wrapper, compressionLevel, 15, 8);
97      }
98  
99      /**
100      * Creates a new zlib encoder with the specified {@code compressionLevel},
101      * the specified {@code windowBits}, the specified {@code memLevel}, and
102      * the specified wrapper.
103      *
104      * @param compressionLevel
105      *        {@code 1} yields the fastest compression and {@code 9} yields the
106      *        best compression.  {@code 0} means no compression.  The default
107      *        compression level is {@code 6}.
108      * @param windowBits
109      *        The base two logarithm of the size of the history buffer.  The
110      *        value should be in the range {@code 9} to {@code 15} inclusive.
111      *        Larger values result in better compression at the expense of
112      *        memory usage.  The default value is {@code 15}.
113      * @param memLevel
114      *        How much memory should be allocated for the internal compression
115      *        state.  {@code 1} uses minimum memory and {@code 9} uses maximum
116      *        memory.  Larger values result in better and faster compression
117      *        at the expense of memory usage.  The default value is {@code 8}
118      *
119      * @throws CompressionException if failed to initialize zlib
120      */
121     public JZlibEncoder(ZlibWrapper wrapper, int compressionLevel, int windowBits, int memLevel) {
122         ObjectUtil.checkInRange(compressionLevel, 0, 9, "compressionLevel");
123         ObjectUtil.checkInRange(windowBits, 9, 15, "windowBits");
124         ObjectUtil.checkInRange(memLevel, 1, 9, "memLevel");
125         ObjectUtil.checkNotNull(wrapper, "wrapper");
126 
127         if (wrapper == ZlibWrapper.ZLIB_OR_NONE) {
128             throw new IllegalArgumentException(
129                     "wrapper '" + ZlibWrapper.ZLIB_OR_NONE + "' is not " +
130                     "allowed for compression.");
131         }
132 
133         int resultCode = z.init(
134                 compressionLevel, windowBits, memLevel,
135                 ZlibUtil.convertWrapperType(wrapper));
136         if (resultCode != JZlib.Z_OK) {
137             ZlibUtil.fail(z, "initialization failure", resultCode);
138         }
139 
140         wrapperOverhead = ZlibUtil.wrapperOverhead(wrapper);
141     }
142 
143     /**
144      * Creates a new zlib encoder with the default compression level ({@code 6}),
145      * default window bits ({@code 15}), default memory level ({@code 8}),
146      * and the specified preset dictionary.  The wrapper is always
147      * {@link ZlibWrapper#ZLIB} because it is the only format that supports
148      * the preset dictionary.
149      *
150      * @param dictionary  the preset dictionary
151      *
152      * @throws CompressionException if failed to initialize zlib
153      */
154     public JZlibEncoder(byte[] dictionary) {
155         this(6, dictionary);
156     }
157 
158     /**
159      * Creates a new zlib encoder with the specified {@code compressionLevel},
160      * default window bits ({@code 15}), default memory level ({@code 8}),
161      * and the specified preset dictionary.  The wrapper is always
162      * {@link ZlibWrapper#ZLIB} because it is the only format that supports
163      * the preset dictionary.
164      *
165      * @param compressionLevel
166      *        {@code 1} yields the fastest compression and {@code 9} yields the
167      *        best compression.  {@code 0} means no compression.  The default
168      *        compression level is {@code 6}.
169      * @param dictionary  the preset dictionary
170      *
171      * @throws CompressionException if failed to initialize zlib
172      */
173     public JZlibEncoder(int compressionLevel, byte[] dictionary) {
174         this(compressionLevel, 15, 8, dictionary);
175     }
176 
177     /**
178      * Creates a new zlib encoder with the specified {@code compressionLevel},
179      * the specified {@code windowBits}, the specified {@code memLevel},
180      * and the specified preset dictionary.  The wrapper is always
181      * {@link ZlibWrapper#ZLIB} because it is the only format that supports
182      * the preset dictionary.
183      *
184      * @param compressionLevel
185      *        {@code 1} yields the fastest compression and {@code 9} yields the
186      *        best compression.  {@code 0} means no compression.  The default
187      *        compression level is {@code 6}.
188      * @param windowBits
189      *        The base two logarithm of the size of the history buffer.  The
190      *        value should be in the range {@code 9} to {@code 15} inclusive.
191      *        Larger values result in better compression at the expense of
192      *        memory usage.  The default value is {@code 15}.
193      * @param memLevel
194      *        How much memory should be allocated for the internal compression
195      *        state.  {@code 1} uses minimum memory and {@code 9} uses maximum
196      *        memory.  Larger values result in better and faster compression
197      *        at the expense of memory usage.  The default value is {@code 8}
198      * @param dictionary  the preset dictionary
199      *
200      * @throws CompressionException if failed to initialize zlib
201      */
202     public JZlibEncoder(int compressionLevel, int windowBits, int memLevel, byte[] dictionary) {
203         ObjectUtil.checkInRange(compressionLevel, 0, 9, "compressionLevel");
204         ObjectUtil.checkInRange(windowBits, 9, 15, "windowBits");
205         ObjectUtil.checkInRange(memLevel, 1, 9, "memLevel");
206         ObjectUtil.checkNotNull(dictionary, "dictionary");
207 
208         int resultCode;
209         resultCode = z.deflateInit(
210                 compressionLevel, windowBits, memLevel,
211                 JZlib.W_ZLIB); // Default: ZLIB format
212         if (resultCode != JZlib.Z_OK) {
213             ZlibUtil.fail(z, "initialization failure", resultCode);
214         } else {
215             resultCode = z.deflateSetDictionary(dictionary, dictionary.length);
216             if (resultCode != JZlib.Z_OK) {
217                 ZlibUtil.fail(z, "failed to set the dictionary", resultCode);
218             }
219         }
220 
221         wrapperOverhead = ZlibUtil.wrapperOverhead(ZlibWrapper.ZLIB);
222     }
223 
224     @Override
225     public ChannelFuture close() {
226         return close(ctx().channel().newPromise());
227     }
228 
229     @Override
230     public ChannelFuture close(final ChannelPromise promise) {
231         ChannelHandlerContext ctx = ctx();
232         EventExecutor executor = ctx.executor();
233         if (executor.inEventLoop()) {
234             return finishEncode(ctx, promise);
235         } else {
236             final ChannelPromise p = ctx.newPromise();
237             executor.execute(new Runnable() {
238                 @Override
239                 public void run() {
240                     ChannelFuture f = finishEncode(ctx(), p);
241                     PromiseNotifier.cascade(f, promise);
242                 }
243             });
244             return p;
245         }
246     }
247 
248     private ChannelHandlerContext ctx() {
249         ChannelHandlerContext ctx = this.ctx;
250         if (ctx == null) {
251             throw new IllegalStateException("not added to a pipeline");
252         }
253         return ctx;
254     }
255 
256     @Override
257     public boolean isClosed() {
258         return finished;
259     }
260 
261     @Override
262     protected void encode(ChannelHandlerContext ctx, ByteBuf in, ByteBuf out) throws Exception {
263         if (finished) {
264             out.writeBytes(in);
265             return;
266         }
267 
268         int inputLength = in.readableBytes();
269         if (inputLength == 0) {
270             return;
271         }
272 
273         try {
274             // Configure input.
275             boolean inHasArray = in.hasArray();
276             z.avail_in = inputLength;
277             if (inHasArray) {
278                 z.next_in = in.array();
279                 z.next_in_index = in.arrayOffset() + in.readerIndex();
280             } else {
281                 byte[] array = new byte[inputLength];
282                 in.getBytes(in.readerIndex(), array);
283                 z.next_in = array;
284                 z.next_in_index = 0;
285             }
286             int oldNextInIndex = z.next_in_index;
287 
288             // Configure output.
289             int maxOutputLength = (int) Math.ceil(inputLength * 1.001) + 12 + wrapperOverhead;
290             out.ensureWritable(maxOutputLength);
291             z.avail_out = maxOutputLength;
292             z.next_out = out.array();
293             z.next_out_index = out.arrayOffset() + out.writerIndex();
294             int oldNextOutIndex = z.next_out_index;
295 
296             // Note that Z_PARTIAL_FLUSH has been deprecated.
297             int resultCode;
298             try {
299                 resultCode = z.deflate(JZlib.Z_SYNC_FLUSH);
300             } finally {
301                 in.skipBytes(z.next_in_index - oldNextInIndex);
302             }
303 
304             if (resultCode != JZlib.Z_OK) {
305                 ZlibUtil.fail(z, "compression failure", resultCode);
306             }
307 
308             int outputLength = z.next_out_index - oldNextOutIndex;
309             if (outputLength > 0) {
310                 out.writerIndex(out.writerIndex() + outputLength);
311             }
312         } finally {
313             // Deference the external references explicitly to tell the VM that
314             // the allocated byte arrays are temporary so that the call stack
315             // can be utilized.
316             // I'm not sure if the modern VMs do this optimization though.
317             z.next_in = null;
318             z.next_out = null;
319         }
320     }
321 
322     @Override
323     public void close(
324             final ChannelHandlerContext ctx,
325             final ChannelPromise promise) {
326         ChannelFuture f = finishEncode(ctx, ctx.newPromise());
327         f.addListener(new ChannelFutureListener() {
328             @Override
329             public void operationComplete(ChannelFuture f) throws Exception {
330                 ctx.close(promise);
331             }
332         });
333 
334         if (!f.isDone()) {
335             // Ensure the channel is closed even if the write operation completes in time.
336             ctx.executor().schedule(new Runnable() {
337                 @Override
338                 public void run() {
339                     ctx.close(promise);
340                 }
341             }, THREAD_POOL_DELAY_SECONDS, TimeUnit.SECONDS);
342         }
343     }
344 
345     private ChannelFuture finishEncode(ChannelHandlerContext ctx, ChannelPromise promise) {
346         if (finished) {
347             promise.setSuccess();
348             return promise;
349         }
350         finished = true;
351 
352         ByteBuf footer;
353         try {
354             // Configure input.
355             z.next_in = EmptyArrays.EMPTY_BYTES;
356             z.next_in_index = 0;
357             z.avail_in = 0;
358 
359             // Configure output.
360             byte[] out = new byte[32]; // room for ADLER32 + ZLIB / CRC32 + GZIP header
361             z.next_out = out;
362             z.next_out_index = 0;
363             z.avail_out = out.length;
364 
365             // Write the ADLER32 checksum (stream footer).
366             int resultCode = z.deflate(JZlib.Z_FINISH);
367             if (resultCode != JZlib.Z_OK && resultCode != JZlib.Z_STREAM_END) {
368                 promise.setFailure(ZlibUtil.deflaterException(z, "compression failure", resultCode));
369                 return promise;
370             } else if (z.next_out_index != 0) { // lgtm[java/constant-comparison]
371                 // Suppressed a warning above to be on the safe side
372                 // even if z.next_out_index seems to be always 0 here
373                 footer = Unpooled.wrappedBuffer(out, 0, z.next_out_index);
374             } else {
375                 footer = Unpooled.EMPTY_BUFFER;
376             }
377         } finally {
378             z.deflateEnd();
379 
380             // Deference the external references explicitly to tell the VM that
381             // the allocated byte arrays are temporary so that the call stack
382             // can be utilized.
383             // I'm not sure if the modern VMs do this optimization though.
384             z.next_in = null;
385             z.next_out = null;
386         }
387         return ctx.writeAndFlush(footer, promise);
388     }
389 
390     @Override
391     public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
392         this.ctx = ctx;
393     }
394 }