View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.codec.compression;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.channel.ChannelFuture;
20  import io.netty.channel.ChannelFutureListener;
21  import io.netty.channel.ChannelHandlerContext;
22  import io.netty.channel.ChannelPromise;
23  import io.netty.channel.ChannelPromiseNotifier;
24  import io.netty.util.concurrent.EventExecutor;
25  
26  import java.util.concurrent.TimeUnit;
27  import java.util.zip.CRC32;
28  import java.util.zip.Deflater;
29  
30  /**
31   * Compresses a {@link ByteBuf} using the deflate algorithm.
32   */
33  public class JdkZlibEncoder extends ZlibEncoder {
34  
35      private final ZlibWrapper wrapper;
36      private final Deflater deflater;
37      private volatile boolean finished;
38      private volatile ChannelHandlerContext ctx;
39  
40      /*
41       * GZIP support
42       */
43      private final CRC32 crc = new CRC32();
44      private static final byte[] gzipHeader = {0x1f, (byte) 0x8b, Deflater.DEFLATED, 0, 0, 0, 0, 0, 0, 0};
45      private boolean writeHeader = true;
46  
47      /**
48       * Creates a new zlib encoder with the default compression level ({@code 6})
49       * and the default wrapper ({@link ZlibWrapper#ZLIB}).
50       *
51       * @throws CompressionException if failed to initialize zlib
52       */
53      public JdkZlibEncoder() {
54          this(6);
55      }
56  
57      /**
58       * Creates a new zlib encoder with the specified {@code compressionLevel}
59       * and the default wrapper ({@link ZlibWrapper#ZLIB}).
60       *
61       * @param compressionLevel
62       *        {@code 1} yields the fastest compression and {@code 9} yields the
63       *        best compression.  {@code 0} means no compression.  The default
64       *        compression level is {@code 6}.
65       *
66       * @throws CompressionException if failed to initialize zlib
67       */
68      public JdkZlibEncoder(int compressionLevel) {
69          this(ZlibWrapper.ZLIB, compressionLevel);
70      }
71  
72      /**
73       * Creates a new zlib encoder with the default compression level ({@code 6})
74       * and the specified wrapper.
75       *
76       * @throws CompressionException if failed to initialize zlib
77       */
78      public JdkZlibEncoder(ZlibWrapper wrapper) {
79          this(wrapper, 6);
80      }
81  
82      /**
83       * Creates a new zlib encoder with the specified {@code compressionLevel}
84       * and the specified wrapper.
85       *
86       * @param compressionLevel
87       *        {@code 1} yields the fastest compression and {@code 9} yields the
88       *        best compression.  {@code 0} means no compression.  The default
89       *        compression level is {@code 6}.
90       *
91       * @throws CompressionException if failed to initialize zlib
92       */
93      public JdkZlibEncoder(ZlibWrapper wrapper, int compressionLevel) {
94          if (compressionLevel < 0 || compressionLevel > 9) {
95              throw new IllegalArgumentException(
96                      "compressionLevel: " + compressionLevel + " (expected: 0-9)");
97          }
98          if (wrapper == null) {
99              throw new NullPointerException("wrapper");
100         }
101         if (wrapper == ZlibWrapper.ZLIB_OR_NONE) {
102             throw new IllegalArgumentException(
103                     "wrapper '" + ZlibWrapper.ZLIB_OR_NONE + "' is not " +
104                     "allowed for compression.");
105         }
106 
107         this.wrapper = wrapper;
108         deflater = new Deflater(compressionLevel, wrapper != ZlibWrapper.ZLIB);
109     }
110 
111     /**
112      * Creates a new zlib encoder with the default compression level ({@code 6})
113      * and the specified preset dictionary.  The wrapper is always
114      * {@link ZlibWrapper#ZLIB} because it is the only format that supports
115      * the preset dictionary.
116      *
117      * @param dictionary  the preset dictionary
118      *
119      * @throws CompressionException if failed to initialize zlib
120      */
121     public JdkZlibEncoder(byte[] dictionary) {
122         this(6, dictionary);
123     }
124 
125     /**
126      * Creates a new zlib encoder with the specified {@code compressionLevel}
127      * and the specified preset dictionary.  The wrapper is always
128      * {@link ZlibWrapper#ZLIB} because it is the only format that supports
129      * the preset dictionary.
130      *
131      * @param compressionLevel
132      *        {@code 1} yields the fastest compression and {@code 9} yields the
133      *        best compression.  {@code 0} means no compression.  The default
134      *        compression level is {@code 6}.
135      * @param dictionary  the preset dictionary
136      *
137      * @throws CompressionException if failed to initialize zlib
138      */
139     public JdkZlibEncoder(int compressionLevel, byte[] dictionary) {
140         if (compressionLevel < 0 || compressionLevel > 9) {
141             throw new IllegalArgumentException(
142                     "compressionLevel: " + compressionLevel + " (expected: 0-9)");
143         }
144         if (dictionary == null) {
145             throw new NullPointerException("dictionary");
146         }
147 
148         wrapper = ZlibWrapper.ZLIB;
149         deflater = new Deflater(compressionLevel);
150         deflater.setDictionary(dictionary);
151     }
152 
153     @Override
154     public ChannelFuture close() {
155         return close(ctx().newPromise());
156     }
157 
158     @Override
159     public ChannelFuture close(final ChannelPromise promise) {
160         ChannelHandlerContext ctx = ctx();
161         EventExecutor executor = ctx.executor();
162         if (executor.inEventLoop()) {
163             return finishEncode(ctx, promise);
164         } else {
165             final ChannelPromise p = ctx.newPromise();
166             executor.execute(new Runnable() {
167                 @Override
168                 public void run() {
169                     ChannelFuture f = finishEncode(ctx(), p);
170                     f.addListener(new ChannelPromiseNotifier(promise));
171                 }
172             });
173             return p;
174         }
175     }
176 
177     private ChannelHandlerContext ctx() {
178         ChannelHandlerContext ctx = this.ctx;
179         if (ctx == null) {
180             throw new IllegalStateException("not added to a pipeline");
181         }
182         return ctx;
183     }
184 
185     @Override
186     public boolean isClosed() {
187         return finished;
188     }
189 
190     @Override
191     protected void encode(ChannelHandlerContext ctx, ByteBuf uncompressed, ByteBuf out) throws Exception {
192         if (finished) {
193             out.writeBytes(uncompressed);
194             return;
195         }
196 
197         int len = uncompressed.readableBytes();
198         if (len == 0) {
199             return;
200         }
201 
202         int offset;
203         byte[] inAry;
204         if (uncompressed.hasArray()) {
205             // if it is backed by an array we not need to to do a copy at all
206             inAry = uncompressed.array();
207             offset = uncompressed.arrayOffset() + uncompressed.readerIndex();
208             // skip all bytes as we will consume all of them
209             uncompressed.skipBytes(len);
210         } else {
211             inAry = new byte[len];
212             uncompressed.readBytes(inAry);
213             offset = 0;
214         }
215 
216         if (writeHeader) {
217             writeHeader = false;
218             if (wrapper == ZlibWrapper.GZIP) {
219                 out.writeBytes(gzipHeader);
220             }
221         }
222 
223         if (wrapper == ZlibWrapper.GZIP) {
224             crc.update(inAry, offset, len);
225         }
226 
227         deflater.setInput(inAry, offset, len);
228         while (!deflater.needsInput()) {
229             deflate(out);
230         }
231     }
232 
233     @Override
234     protected final ByteBuf allocateBuffer(ChannelHandlerContext ctx, ByteBuf msg,
235                                            boolean preferDirect) throws Exception {
236         int sizeEstimate = (int) Math.ceil(msg.readableBytes() * 1.001) + 12;
237         if (writeHeader) {
238             switch (wrapper) {
239                 case GZIP:
240                     sizeEstimate += gzipHeader.length;
241                     break;
242                 case ZLIB:
243                     sizeEstimate += 2; // first two magic bytes
244                     break;
245                 default:
246                     // no op
247             }
248         }
249         return ctx.alloc().heapBuffer(sizeEstimate);
250     }
251 
252     @Override
253     public void close(final ChannelHandlerContext ctx, final ChannelPromise promise) throws Exception {
254         ChannelFuture f = finishEncode(ctx, ctx.newPromise());
255         f.addListener(new ChannelFutureListener() {
256             @Override
257             public void operationComplete(ChannelFuture f) throws Exception {
258                 ctx.close(promise);
259             }
260         });
261 
262         if (!f.isDone()) {
263             // Ensure the channel is closed even if the write operation completes in time.
264             ctx.executor().schedule(new Runnable() {
265                 @Override
266                 public void run() {
267                     ctx.close(promise);
268                 }
269             }, 10, TimeUnit.SECONDS); // FIXME: Magic number
270         }
271     }
272 
273     private ChannelFuture finishEncode(final ChannelHandlerContext ctx, ChannelPromise promise) {
274         if (finished) {
275             promise.setSuccess();
276             return promise;
277         }
278 
279         finished = true;
280         ByteBuf footer = ctx.alloc().heapBuffer();
281         if (writeHeader && wrapper == ZlibWrapper.GZIP) {
282             // Write the GZIP header first if not written yet. (i.e. user wrote nothing.)
283             writeHeader = false;
284             footer.writeBytes(gzipHeader);
285         }
286 
287         deflater.finish();
288 
289         while (!deflater.finished()) {
290             deflate(footer);
291             if (!footer.isWritable()) {
292                 // no more space so write it to the channel and continue
293                 ctx.write(footer);
294                 footer = ctx.alloc().heapBuffer();
295             }
296         }
297         if (wrapper == ZlibWrapper.GZIP) {
298             int crcValue = (int) crc.getValue();
299             int uncBytes = deflater.getTotalIn();
300             footer.writeByte(crcValue);
301             footer.writeByte(crcValue >>> 8);
302             footer.writeByte(crcValue >>> 16);
303             footer.writeByte(crcValue >>> 24);
304             footer.writeByte(uncBytes);
305             footer.writeByte(uncBytes >>> 8);
306             footer.writeByte(uncBytes >>> 16);
307             footer.writeByte(uncBytes >>> 24);
308         }
309         deflater.end();
310         return ctx.writeAndFlush(footer, promise);
311     }
312 
313     private void deflate(ByteBuf out) {
314         int numBytes;
315         do {
316             int writerIndex = out.writerIndex();
317             numBytes = deflater.deflate(
318                     out.array(), out.arrayOffset() + writerIndex, out.writableBytes(), Deflater.SYNC_FLUSH);
319             out.writerIndex(writerIndex + numBytes);
320         } while (numBytes > 0);
321     }
322 
323     @Override
324     public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
325         this.ctx = ctx;
326     }
327 }