View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.codec.compression;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.channel.ChannelFuture;
20  import io.netty.channel.ChannelHandlerContext;
21  import io.netty.channel.ChannelPromise;
22  import io.netty.util.concurrent.EventExecutor;
23  import io.netty.util.concurrent.PromiseNotifier;
24  import io.netty.util.internal.EmptyArrays;
25  import io.netty.util.internal.ObjectUtil;
26  import io.netty.util.internal.SystemPropertyUtil;
27  import io.netty.util.internal.logging.InternalLogger;
28  import io.netty.util.internal.logging.InternalLoggerFactory;
29  
30  import java.util.zip.CRC32;
31  import java.util.zip.Deflater;
32  
33  /**
34   * Compresses a {@link ByteBuf} using the deflate algorithm.
35   */
36  public class JdkZlibEncoder extends ZlibEncoder {
37  
38      private static final InternalLogger logger = InternalLoggerFactory.getInstance(JdkZlibEncoder.class);
39  
40      /**
41       * Maximum initial size for temporary heap buffers used for the compressed output. Buffer may still grow beyond
42       * this if necessary.
43       */
44      private static final int MAX_INITIAL_OUTPUT_BUFFER_SIZE;
45      /**
46       * Max size for temporary heap buffers used to copy input data to heap.
47       */
48      private static final int MAX_INPUT_BUFFER_SIZE;
49  
50      private final ZlibWrapper wrapper;
51      private final Deflater deflater;
52      private volatile boolean finished;
53      private volatile ChannelHandlerContext ctx;
54  
55      /*
56       * GZIP support
57       */
58      private final CRC32 crc = new CRC32();
59      private static final byte[] gzipHeader = {0x1f, (byte) 0x8b, Deflater.DEFLATED, 0, 0, 0, 0, 0, 0, 0};
60      private boolean writeHeader = true;
61  
62      static {
63          MAX_INITIAL_OUTPUT_BUFFER_SIZE = SystemPropertyUtil.getInt(
64                  "io.netty.jdkzlib.encoder.maxInitialOutputBufferSize",
65                  65536);
66          MAX_INPUT_BUFFER_SIZE = SystemPropertyUtil.getInt(
67                  "io.netty.jdkzlib.encoder.maxInputBufferSize",
68                  65536);
69  
70          if (logger.isDebugEnabled()) {
71              logger.debug("-Dio.netty.jdkzlib.encoder.maxInitialOutputBufferSize={}", MAX_INITIAL_OUTPUT_BUFFER_SIZE);
72              logger.debug("-Dio.netty.jdkzlib.encoder.maxInputBufferSize={}", MAX_INPUT_BUFFER_SIZE);
73          }
74      }
75  
76      /**
77       * Creates a new zlib encoder with the default compression level ({@code 6})
78       * and the default wrapper ({@link ZlibWrapper#ZLIB}).
79       *
80       * @throws CompressionException if failed to initialize zlib
81       */
82      public JdkZlibEncoder() {
83          this(6);
84      }
85  
86      /**
87       * Creates a new zlib encoder with the specified {@code compressionLevel}
88       * and the default wrapper ({@link ZlibWrapper#ZLIB}).
89       *
90       * @param compressionLevel
91       *        {@code 1} yields the fastest compression and {@code 9} yields the
92       *        best compression.  {@code 0} means no compression.  The default
93       *        compression level is {@code 6}.
94       *
95       * @throws CompressionException if failed to initialize zlib
96       */
97      public JdkZlibEncoder(int compressionLevel) {
98          this(ZlibWrapper.ZLIB, compressionLevel);
99      }
100 
101     /**
102      * Creates a new zlib encoder with the default compression level ({@code 6})
103      * and the specified wrapper.
104      *
105      * @throws CompressionException if failed to initialize zlib
106      */
107     public JdkZlibEncoder(ZlibWrapper wrapper) {
108         this(wrapper, 6);
109     }
110 
111     /**
112      * Creates a new zlib encoder with the specified {@code compressionLevel}
113      * and the specified wrapper.
114      *
115      * @param compressionLevel
116      *        {@code 1} yields the fastest compression and {@code 9} yields the
117      *        best compression.  {@code 0} means no compression.  The default
118      *        compression level is {@code 6}.
119      *
120      * @throws CompressionException if failed to initialize zlib
121      */
122     public JdkZlibEncoder(ZlibWrapper wrapper, int compressionLevel) {
123         ObjectUtil.checkInRange(compressionLevel, 0, 9, "compressionLevel");
124         ObjectUtil.checkNotNull(wrapper, "wrapper");
125 
126         if (wrapper == ZlibWrapper.ZLIB_OR_NONE) {
127             throw new IllegalArgumentException(
128                     "wrapper '" + ZlibWrapper.ZLIB_OR_NONE + "' is not " +
129                     "allowed for compression.");
130         }
131 
132         this.wrapper = wrapper;
133         deflater = new Deflater(compressionLevel, wrapper != ZlibWrapper.ZLIB);
134     }
135 
136     /**
137      * Creates a new zlib encoder with the default compression level ({@code 6})
138      * and the specified preset dictionary.  The wrapper is always
139      * {@link ZlibWrapper#ZLIB} because it is the only format that supports
140      * the preset dictionary.
141      *
142      * @param dictionary  the preset dictionary
143      *
144      * @throws CompressionException if failed to initialize zlib
145      */
146     public JdkZlibEncoder(byte[] dictionary) {
147         this(6, dictionary);
148     }
149 
150     /**
151      * Creates a new zlib encoder with the specified {@code compressionLevel}
152      * and the specified preset dictionary.  The wrapper is always
153      * {@link ZlibWrapper#ZLIB} because it is the only format that supports
154      * the preset dictionary.
155      *
156      * @param compressionLevel
157      *        {@code 1} yields the fastest compression and {@code 9} yields the
158      *        best compression.  {@code 0} means no compression.  The default
159      *        compression level is {@code 6}.
160      * @param dictionary  the preset dictionary
161      *
162      * @throws CompressionException if failed to initialize zlib
163      */
164     public JdkZlibEncoder(int compressionLevel, byte[] dictionary) {
165         ObjectUtil.checkInRange(compressionLevel, 0, 9, "compressionLevel");
166         ObjectUtil.checkNotNull(dictionary, "dictionary");
167 
168         wrapper = ZlibWrapper.ZLIB;
169         deflater = new Deflater(compressionLevel);
170         deflater.setDictionary(dictionary);
171     }
172 
173     @Override
174     public ChannelFuture close() {
175         return close(ctx().newPromise());
176     }
177 
178     @Override
179     public ChannelFuture close(final ChannelPromise promise) {
180         ChannelHandlerContext ctx = ctx();
181         EventExecutor executor = ctx.executor();
182         if (executor.inEventLoop()) {
183             return finishEncode(ctx, promise);
184         } else {
185             final ChannelPromise p = ctx.newPromise();
186             executor.execute(new Runnable() {
187                 @Override
188                 public void run() {
189                     ChannelFuture f = finishEncode(ctx(), p);
190                     PromiseNotifier.cascade(f, promise);
191                 }
192             });
193             return p;
194         }
195     }
196 
197     private ChannelHandlerContext ctx() {
198         ChannelHandlerContext ctx = this.ctx;
199         if (ctx == null) {
200             throw new IllegalStateException("not added to a pipeline");
201         }
202         return ctx;
203     }
204 
205     @Override
206     public boolean isClosed() {
207         return finished;
208     }
209 
210     @Override
211     protected void encode(ChannelHandlerContext ctx, ByteBuf uncompressed, ByteBuf out) throws Exception {
212         if (finished) {
213             out.writeBytes(uncompressed);
214             return;
215         }
216 
217         int len = uncompressed.readableBytes();
218         if (len == 0) {
219             return;
220         }
221 
222         if (uncompressed.hasArray()) {
223             // if it is backed by an array we not need to do a copy at all
224             encodeSome(uncompressed, out);
225         } else {
226             int heapBufferSize = Math.min(len, MAX_INPUT_BUFFER_SIZE);
227             ByteBuf heapBuf = ctx.alloc().heapBuffer(heapBufferSize, heapBufferSize);
228             try {
229                 while (uncompressed.isReadable()) {
230                     uncompressed.readBytes(heapBuf, Math.min(heapBuf.writableBytes(), uncompressed.readableBytes()));
231                     encodeSome(heapBuf, out);
232                     heapBuf.clear();
233                 }
234             } finally {
235                 heapBuf.release();
236             }
237         }
238         // clear input so that we don't keep an unnecessary reference to the input array
239         deflater.setInput(EmptyArrays.EMPTY_BYTES);
240     }
241 
242     private void encodeSome(ByteBuf in, ByteBuf out) {
243         // both in and out are heap buffers, here
244 
245         byte[] inAry = in.array();
246         int offset = in.arrayOffset() + in.readerIndex();
247 
248         if (writeHeader) {
249             writeHeader = false;
250             if (wrapper == ZlibWrapper.GZIP) {
251                 out.writeBytes(gzipHeader);
252             }
253         }
254 
255         int len = in.readableBytes();
256         if (wrapper == ZlibWrapper.GZIP) {
257             crc.update(inAry, offset, len);
258         }
259 
260         deflater.setInput(inAry, offset, len);
261         for (;;) {
262             deflate(out);
263             if (!out.isWritable()) {
264                 // The buffer is not writable anymore. Increase the capacity to make more room.
265                 // Can't rely on needsInput here, it might return true even if there's still data to be written.
266                 out.ensureWritable(out.writerIndex());
267             } else if (deflater.needsInput()) {
268                 // Consumed everything
269                 break;
270             }
271         }
272         in.skipBytes(len);
273     }
274 
275     @Override
276     protected final ByteBuf allocateBuffer(ChannelHandlerContext ctx, ByteBuf msg,
277                                            boolean preferDirect) throws Exception {
278         int sizeEstimate = (int) Math.ceil(msg.readableBytes() * 1.001) + 12;
279         if (writeHeader) {
280             switch (wrapper) {
281                 case GZIP:
282                     sizeEstimate += gzipHeader.length;
283                     break;
284                 case ZLIB:
285                     sizeEstimate += 2; // first two magic bytes
286                     break;
287                 default:
288                     // no op
289             }
290         }
291         // sizeEstimate might overflow if close to 2G
292         if (sizeEstimate < 0 || sizeEstimate > MAX_INITIAL_OUTPUT_BUFFER_SIZE) {
293             // can always expand later
294             return ctx.alloc().heapBuffer(MAX_INITIAL_OUTPUT_BUFFER_SIZE);
295         }
296         return ctx.alloc().heapBuffer(sizeEstimate);
297     }
298 
299     @Override
300     public void close(final ChannelHandlerContext ctx, final ChannelPromise promise) throws Exception {
301         ChannelFuture f = finishEncode(ctx, ctx.newPromise());
302         EncoderUtil.closeAfterFinishEncode(ctx, f, promise);
303     }
304 
305     private ChannelFuture finishEncode(final ChannelHandlerContext ctx, ChannelPromise promise) {
306         if (finished) {
307             promise.setSuccess();
308             return promise;
309         }
310 
311         finished = true;
312         ByteBuf footer = ctx.alloc().heapBuffer();
313         if (writeHeader && wrapper == ZlibWrapper.GZIP) {
314             // Write the GZIP header first if not written yet. (i.e. user wrote nothing.)
315             writeHeader = false;
316             footer.writeBytes(gzipHeader);
317         }
318 
319         deflater.finish();
320 
321         while (!deflater.finished()) {
322             deflate(footer);
323             if (!footer.isWritable()) {
324                 // no more space so write it to the channel and continue
325                 ctx.write(footer);
326                 footer = ctx.alloc().heapBuffer();
327             }
328         }
329         if (wrapper == ZlibWrapper.GZIP) {
330             int crcValue = (int) crc.getValue();
331             int uncBytes = deflater.getTotalIn();
332             footer.writeByte(crcValue);
333             footer.writeByte(crcValue >>> 8);
334             footer.writeByte(crcValue >>> 16);
335             footer.writeByte(crcValue >>> 24);
336             footer.writeByte(uncBytes);
337             footer.writeByte(uncBytes >>> 8);
338             footer.writeByte(uncBytes >>> 16);
339             footer.writeByte(uncBytes >>> 24);
340         }
341         deflater.end();
342         return ctx.writeAndFlush(footer, promise);
343     }
344 
345     private void deflate(ByteBuf out) {
346         int numBytes;
347         do {
348             int writerIndex = out.writerIndex();
349             numBytes = deflater.deflate(
350                     out.array(), out.arrayOffset() + writerIndex, out.writableBytes(), Deflater.SYNC_FLUSH);
351             out.writerIndex(writerIndex + numBytes);
352         } while (numBytes > 0);
353     }
354 
355     @Override
356     public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
357         this.ctx = ctx;
358     }
359 }