View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.handler.codec.compression;
17  
18  import org.jboss.netty.buffer.ChannelBuffer;
19  import org.jboss.netty.buffer.ChannelBuffers;
20  import org.jboss.netty.channel.Channel;
21  import org.jboss.netty.channel.ChannelEvent;
22  import org.jboss.netty.channel.ChannelFuture;
23  import org.jboss.netty.channel.ChannelFutureListener;
24  import org.jboss.netty.channel.ChannelHandlerContext;
25  import org.jboss.netty.channel.ChannelStateEvent;
26  import org.jboss.netty.channel.Channels;
27  import org.jboss.netty.channel.LifeCycleAwareChannelHandler;
28  import org.jboss.netty.handler.codec.oneone.OneToOneStrictEncoder;
29  
30  import java.util.concurrent.atomic.AtomicBoolean;
31  import java.util.zip.CRC32;
32  import java.util.zip.Deflater;
33  
34  
35  /**
36   * Compresses a {@link ChannelBuffer} using the deflate algorithm.
37   * @apiviz.landmark
38   * @apiviz.has org.jboss.netty.handler.codec.compression.ZlibWrapper
39   */
40  public class JdkZlibEncoder extends OneToOneStrictEncoder implements LifeCycleAwareChannelHandler {
41  
42      private final ZlibWrapper wrapper;
43      private final Deflater deflater;
44      private final AtomicBoolean finished = new AtomicBoolean();
45      private volatile ChannelHandlerContext ctx;
46      private byte[] out;
47  
48      private final CRC32 crc;
49      private static final byte[] gzipHeader = {0x1f, (byte) 0x8b, Deflater.DEFLATED, 0, 0, 0, 0, 0, 0, 0};
50      private boolean writeHeader = true;
51  
52      /**
53       * Creates a new zlib encoder with the default compression level ({@code 6})
54       * and the default wrapper ({@link ZlibWrapper#ZLIB}).
55       *
56       * @throws CompressionException if failed to initialize zlib
57       */
58      public JdkZlibEncoder() {
59          this(6);
60      }
61  
62      /**
63       * Creates a new zlib encoder with the specified {@code compressionLevel}
64       * and the default wrapper ({@link ZlibWrapper#ZLIB}).
65       *
66       * @param compressionLevel
67       *        {@code 1} yields the fastest compression and {@code 9} yields the
68       *        best compression.  {@code 0} means no compression.  The default
69       *        compression level is {@code 6}.
70       *
71       * @throws CompressionException if failed to initialize zlib
72       */
73      public JdkZlibEncoder(int compressionLevel) {
74          this(ZlibWrapper.ZLIB, compressionLevel);
75      }
76  
77      /**
78       * Creates a new zlib encoder with the default compression level ({@code 6})
79       * and the specified wrapper.
80       *
81       * @throws CompressionException if failed to initialize zlib
82       */
83      public JdkZlibEncoder(ZlibWrapper wrapper) {
84          this(wrapper, 6);
85      }
86  
87      /**
88       * Creates a new zlib encoder with the specified {@code compressionLevel}
89       * and the specified wrapper.
90       *
91       * @param compressionLevel
92       *        {@code 1} yields the fastest compression and {@code 9} yields the
93       *        best compression.  {@code 0} means no compression.  The default
94       *        compression level is {@code 6}.
95       *
96       * @throws CompressionException if failed to initialize zlib
97       */
98      public JdkZlibEncoder(ZlibWrapper wrapper, int compressionLevel) {
99          if (compressionLevel < 0 || compressionLevel > 9) {
100             throw new IllegalArgumentException(
101                     "compressionLevel: " + compressionLevel + " (expected: 0-9)");
102         }
103         if (wrapper == null) {
104             throw new NullPointerException("wrapper");
105         }
106         if (wrapper == ZlibWrapper.ZLIB_OR_NONE) {
107             throw new IllegalArgumentException(
108                     "wrapper '" + ZlibWrapper.ZLIB_OR_NONE + "' is not " +
109                     "allowed for compression.");
110         }
111 
112         this.wrapper = wrapper;
113         deflater = new Deflater(compressionLevel, wrapper != ZlibWrapper.ZLIB);
114         if (wrapper == ZlibWrapper.GZIP) {
115             crc = new CRC32();
116         } else {
117             crc = null;
118         }
119     }
120 
121     /**
122      * Creates a new zlib encoder with the default compression level ({@code 6})
123      * and the specified preset dictionary.  The wrapper is always
124      * {@link ZlibWrapper#ZLIB} because it is the only format that supports
125      * the preset dictionary.
126      *
127      * @param dictionary  the preset dictionary
128      *
129      * @throws CompressionException if failed to initialize zlib
130      */
131     public JdkZlibEncoder(byte[] dictionary) {
132         this(6, dictionary);
133     }
134 
135     /**
136      * Creates a new zlib encoder with the specified {@code compressionLevel}
137      * and the specified preset dictionary.  The wrapper is always
138      * {@link ZlibWrapper#ZLIB} because it is the only format that supports
139      * the preset dictionary.
140      *
141      * @param compressionLevel
142      *        {@code 1} yields the fastest compression and {@code 9} yields the
143      *        best compression.  {@code 0} means no compression.  The default
144      *        compression level is {@code 6}.
145      * @param dictionary  the preset dictionary
146      *
147      * @throws CompressionException if failed to initialize zlib
148      */
149     public JdkZlibEncoder(int compressionLevel, byte[] dictionary) {
150         if (compressionLevel < 0 || compressionLevel > 9) {
151             throw new IllegalArgumentException(
152                     "compressionLevel: " + compressionLevel + " (expected: 0-9)");
153         }
154         if (dictionary == null) {
155             throw new NullPointerException("dictionary");
156         }
157 
158         wrapper = ZlibWrapper.ZLIB;
159         crc = null;
160         deflater = new Deflater(compressionLevel);
161         deflater.setDictionary(dictionary);
162     }
163 
164     public ChannelFuture close() {
165         ChannelHandlerContext ctx = this.ctx;
166         if (ctx == null) {
167             throw new IllegalStateException("not added to a pipeline");
168         }
169         return finishEncode(ctx, null);
170     }
171 
172     private boolean isGzip() {
173         return wrapper == ZlibWrapper.GZIP;
174     }
175 
176     public boolean isClosed() {
177         return finished.get();
178     }
179 
180     @Override
181     protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception {
182         if (!(msg instanceof ChannelBuffer) || finished.get()) {
183             return msg;
184         }
185 
186         final ChannelBuffer uncompressed = (ChannelBuffer) msg;
187         final int uncompressedLen = uncompressed.readableBytes();
188         if (uncompressedLen == 0) {
189             return uncompressed;
190         }
191 
192         final byte[] in = new byte[uncompressedLen];
193         uncompressed.readBytes(in);
194 
195         final int sizeEstimate = estimateCompressedSize(uncompressedLen);
196         final ChannelBuffer compressed =
197                 ChannelBuffers.dynamicBuffer(sizeEstimate, channel.getConfig().getBufferFactory());
198 
199         synchronized (deflater) {
200             if (isGzip()) {
201                 crc.update(in);
202                 if (writeHeader) {
203                     compressed.writeBytes(gzipHeader);
204                     writeHeader = false;
205                 }
206             }
207 
208             deflater.setInput(in);
209             while (!deflater.needsInput()) {
210                 deflate(compressed);
211             }
212         }
213 
214         return compressed;
215     }
216 
217     private int estimateCompressedSize(int originalSize) {
218         int sizeEstimate = (int) Math.ceil(originalSize * 1.001) + 12;
219         if (writeHeader) {
220             switch (wrapper) {
221             case GZIP:
222                 sizeEstimate += gzipHeader.length;
223                 break;
224             case ZLIB:
225                 sizeEstimate += 2; // first two magic bytes
226                 break;
227             }
228         }
229         return sizeEstimate;
230     }
231 
232     @Override
233     public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent evt)
234             throws Exception {
235         if (evt instanceof ChannelStateEvent) {
236             ChannelStateEvent e = (ChannelStateEvent) evt;
237             switch (e.getState()) {
238             case OPEN:
239             case CONNECTED:
240             case BOUND:
241                 if (Boolean.FALSE.equals(e.getValue()) || e.getValue() == null) {
242                     finishEncode(ctx, evt);
243                     return;
244                 }
245             }
246         }
247 
248         super.handleDownstream(ctx, evt);
249     }
250 
251     private ChannelFuture finishEncode(final ChannelHandlerContext ctx, final ChannelEvent evt) {
252         ChannelFuture future = Channels.succeededFuture(ctx.getChannel());
253 
254         if (!finished.compareAndSet(false, true)) {
255             if (evt != null) {
256                 ctx.sendDownstream(evt);
257             }
258             return future;
259         }
260 
261         final ChannelBuffer footer = ChannelBuffers.dynamicBuffer(ctx.getChannel().getConfig().getBufferFactory());
262         final boolean gzip = isGzip();
263         synchronized (deflater) {
264             if (gzip && writeHeader) {
265                 // Write the GZIP header first if not written yet. (i.e. user wrote nothing.)
266                 writeHeader = false;
267                 footer.writeBytes(gzipHeader);
268             }
269 
270             deflater.finish();
271             while (!deflater.finished()) {
272                 deflate(footer);
273             }
274             if (gzip) {
275                 int crcValue = (int) crc.getValue();
276                 int uncBytes = deflater.getTotalIn();
277                 footer.writeByte(crcValue);
278                 footer.writeByte(crcValue >>> 8);
279                 footer.writeByte(crcValue >>> 16);
280                 footer.writeByte(crcValue >>> 24);
281                 footer.writeByte(uncBytes);
282                 footer.writeByte(uncBytes >>> 8);
283                 footer.writeByte(uncBytes >>> 16);
284                 footer.writeByte(uncBytes >>> 24);
285             }
286             deflater.end();
287         }
288 
289         if (footer.readable()) {
290             future = Channels.future(ctx.getChannel());
291             Channels.write(ctx, future, footer);
292         }
293 
294         if (evt != null) {
295             future.addListener(new ChannelFutureListener() {
296                 public void operationComplete(ChannelFuture future) throws Exception {
297                     ctx.sendDownstream(evt);
298                 }
299             });
300         }
301 
302         return future;
303     }
304 
305     private void deflate(ChannelBuffer out) {
306         int numBytes;
307         if (out.hasArray()) {
308             do {
309                 int writerIndex = out.writerIndex();
310                 numBytes = deflater.deflate(
311                         out.array(), out.arrayOffset() + writerIndex, out.writableBytes(),
312                         Deflater.SYNC_FLUSH);
313                 out.writerIndex(writerIndex + numBytes);
314             } while (numBytes > 0);
315         } else {
316             byte[] tmpOut = this.out;
317             if (tmpOut == null) {
318                 tmpOut = this.out = new byte[8192];
319             }
320 
321             do {
322                 numBytes = deflater.deflate(tmpOut, 0, tmpOut.length, Deflater.SYNC_FLUSH);
323                 out.writeBytes(tmpOut, 0, numBytes);
324             } while (numBytes > 0);
325         }
326     }
327 
328     public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
329         this.ctx = ctx;
330     }
331 
332     public void afterAdd(ChannelHandlerContext ctx) throws Exception {
333         // Unused
334     }
335 
336     public void beforeRemove(ChannelHandlerContext ctx) throws Exception {
337         // Unused
338     }
339 
340     public void afterRemove(ChannelHandlerContext ctx) throws Exception {
341         // Unused
342     }
343 }