View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.handler.codec.compression;
17  
18  import java.util.concurrent.atomic.AtomicBoolean;
19  import java.util.zip.CRC32;
20  import java.util.zip.Deflater;
21  
22  import org.jboss.netty.buffer.ChannelBuffer;
23  import org.jboss.netty.buffer.ChannelBuffers;
24  import org.jboss.netty.channel.Channel;
25  import org.jboss.netty.channel.ChannelEvent;
26  import org.jboss.netty.channel.ChannelFuture;
27  import org.jboss.netty.channel.ChannelFutureListener;
28  import org.jboss.netty.channel.ChannelHandlerContext;
29  import org.jboss.netty.channel.ChannelStateEvent;
30  import org.jboss.netty.channel.Channels;
31  import org.jboss.netty.channel.LifeCycleAwareChannelHandler;
32  import org.jboss.netty.handler.codec.oneone.OneToOneStrictEncoder;
33  
34  
35  /**
36   * Compresses a {@link ChannelBuffer} using the deflate algorithm.
37   * @apiviz.landmark
38   * @apiviz.has org.jboss.netty.handler.codec.compression.ZlibWrapper
39   */
40  public class JdkZlibEncoder extends OneToOneStrictEncoder implements LifeCycleAwareChannelHandler {
41  
42      private final byte[] out = new byte[8192];
43      private final Deflater deflater;
44      private final AtomicBoolean finished = new AtomicBoolean();
45      private volatile ChannelHandlerContext ctx;
46  
47      /*
48       * GZIP support
49       */
50      private final boolean gzip;
51      private final CRC32 crc = new CRC32();
52      private static final byte[] gzipHeader = {0x1f, (byte) 0x8b, Deflater.DEFLATED, 0, 0, 0, 0, 0, 0, 0};
53      private boolean writeHeader = true;
54  
55      /**
56       * Creates a new zlib encoder with the default compression level ({@code 6})
57       * and the default wrapper ({@link ZlibWrapper#ZLIB}).
58       *
59       * @throws CompressionException if failed to initialize zlib
60       */
61      public JdkZlibEncoder() {
62          this(6);
63      }
64  
65      /**
66       * Creates a new zlib encoder with the specified {@code compressionLevel}
67       * and the default wrapper ({@link ZlibWrapper#ZLIB}).
68       *
69       * @param compressionLevel
70       *        {@code 1} yields the fastest compression and {@code 9} yields the
71       *        best compression.  {@code 0} means no compression.  The default
72       *        compression level is {@code 6}.
73       *
74       * @throws CompressionException if failed to initialize zlib
75       */
76      public JdkZlibEncoder(int compressionLevel) {
77          this(ZlibWrapper.ZLIB, compressionLevel);
78      }
79  
80      /**
81       * Creates a new zlib encoder with the default compression level ({@code 6})
82       * and the specified wrapper.
83       *
84       * @throws CompressionException if failed to initialize zlib
85       */
86      public JdkZlibEncoder(ZlibWrapper wrapper) {
87          this(wrapper, 6);
88      }
89  
90      /**
91       * Creates a new zlib encoder with the specified {@code compressionLevel}
92       * and the specified wrapper.
93       *
94       * @param compressionLevel
95       *        {@code 1} yields the fastest compression and {@code 9} yields the
96       *        best compression.  {@code 0} means no compression.  The default
97       *        compression level is {@code 6}.
98       *
99       * @throws CompressionException if failed to initialize zlib
100      */
101     public JdkZlibEncoder(ZlibWrapper wrapper, int compressionLevel) {
102         if (compressionLevel < 0 || compressionLevel > 9) {
103             throw new IllegalArgumentException(
104                     "compressionLevel: " + compressionLevel + " (expected: 0-9)");
105         }
106         if (wrapper == null) {
107             throw new NullPointerException("wrapper");
108         }
109         if (wrapper == ZlibWrapper.ZLIB_OR_NONE) {
110             throw new IllegalArgumentException(
111                     "wrapper '" + ZlibWrapper.ZLIB_OR_NONE + "' is not " +
112                     "allowed for compression.");
113         }
114 
115         gzip = wrapper == ZlibWrapper.GZIP;
116         deflater = new Deflater(compressionLevel, wrapper != ZlibWrapper.ZLIB);
117     }
118 
119     /**
120      * Creates a new zlib encoder with the default compression level ({@code 6})
121      * and the specified preset dictionary.  The wrapper is always
122      * {@link ZlibWrapper#ZLIB} because it is the only format that supports
123      * the preset dictionary.
124      *
125      * @param dictionary  the preset dictionary
126      *
127      * @throws CompressionException if failed to initialize zlib
128      */
129     public JdkZlibEncoder(byte[] dictionary) {
130         this(6, dictionary);
131     }
132 
133     /**
134      * Creates a new zlib encoder with the specified {@code compressionLevel}
135      * and the specified preset dictionary.  The wrapper is always
136      * {@link ZlibWrapper#ZLIB} because it is the only format that supports
137      * the preset dictionary.
138      *
139      * @param compressionLevel
140      *        {@code 1} yields the fastest compression and {@code 9} yields the
141      *        best compression.  {@code 0} means no compression.  The default
142      *        compression level is {@code 6}.
143      * @param dictionary  the preset dictionary
144      *
145      * @throws CompressionException if failed to initialize zlib
146      */
147     public JdkZlibEncoder(int compressionLevel, byte[] dictionary) {
148         if (compressionLevel < 0 || compressionLevel > 9) {
149             throw new IllegalArgumentException(
150                     "compressionLevel: " + compressionLevel + " (expected: 0-9)");
151         }
152         if (dictionary == null) {
153             throw new NullPointerException("dictionary");
154         }
155 
156         gzip = false;
157         deflater = new Deflater(compressionLevel);
158         deflater.setDictionary(dictionary);
159     }
160 
161     public ChannelFuture close() {
162         ChannelHandlerContext ctx = this.ctx;
163         if (ctx == null) {
164             throw new IllegalStateException("not added to a pipeline");
165         }
166         return finishEncode(ctx, null);
167     }
168 
169     public boolean isClosed() {
170         return finished.get();
171     }
172 
173     @Override
174     protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception {
175         if (!(msg instanceof ChannelBuffer) || finished.get()) {
176             return msg;
177         }
178 
179         ChannelBuffer uncompressed = (ChannelBuffer) msg;
180         byte[] in = new byte[uncompressed.readableBytes()];
181         uncompressed.readBytes(in);
182 
183         int sizeEstimate = (int) Math.ceil(in.length * 1.001) + 12;
184         ChannelBuffer compressed = ChannelBuffers.dynamicBuffer(sizeEstimate, channel.getConfig().getBufferFactory());
185 
186         synchronized (deflater) {
187             if (gzip) {
188                 crc.update(in);
189                 if (writeHeader) {
190                     compressed.writeBytes(gzipHeader);
191                     writeHeader = false;
192                 }
193             }
194 
195             deflater.setInput(in);
196             while (!deflater.needsInput()) {
197                 int numBytes = deflater.deflate(out, 0, out.length, Deflater.SYNC_FLUSH);
198                 compressed.writeBytes(out, 0, numBytes);
199             }
200         }
201 
202         return compressed;
203     }
204 
205     @Override
206     public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent evt)
207             throws Exception {
208         if (evt instanceof ChannelStateEvent) {
209             ChannelStateEvent e = (ChannelStateEvent) evt;
210             switch (e.getState()) {
211             case OPEN:
212             case CONNECTED:
213             case BOUND:
214                 if (Boolean.FALSE.equals(e.getValue()) || e.getValue() == null) {
215                     finishEncode(ctx, evt);
216                     return;
217                 }
218             }
219         }
220 
221         super.handleDownstream(ctx, evt);
222     }
223 
224     private ChannelFuture finishEncode(final ChannelHandlerContext ctx, final ChannelEvent evt) {
225         ChannelFuture future = Channels.succeededFuture(ctx.getChannel());
226 
227         if (!finished.compareAndSet(false, true)) {
228             if (evt != null) {
229                 ctx.sendDownstream(evt);
230             }
231             return future;
232         }
233 
234         ChannelBuffer footer = ChannelBuffers.dynamicBuffer(ctx.getChannel().getConfig().getBufferFactory());
235         synchronized (deflater) {
236             deflater.finish();
237             while (!deflater.finished()) {
238                 int numBytes = deflater.deflate(out, 0, out.length);
239                 footer.writeBytes(out, 0, numBytes);
240             }
241             if (gzip) {
242                 int crcValue = (int) crc.getValue();
243                 int uncBytes = deflater.getTotalIn();
244                 footer.writeByte(crcValue);
245                 footer.writeByte(crcValue >>> 8);
246                 footer.writeByte(crcValue >>> 16);
247                 footer.writeByte(crcValue >>> 24);
248                 footer.writeByte(uncBytes);
249                 footer.writeByte(uncBytes >>> 8);
250                 footer.writeByte(uncBytes >>> 16);
251                 footer.writeByte(uncBytes >>> 24);
252             }
253             deflater.end();
254         }
255 
256         if (footer.readable()) {
257             future = Channels.future(ctx.getChannel());
258             Channels.write(ctx, future, footer);
259         }
260 
261         if (evt != null) {
262             future.addListener(new ChannelFutureListener() {
263                 public void operationComplete(ChannelFuture future) throws Exception {
264                     ctx.sendDownstream(evt);
265                 }
266             });
267         }
268 
269         return future;
270     }
271 
272     public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
273         this.ctx = ctx;
274     }
275 
276     public void afterAdd(ChannelHandlerContext ctx) throws Exception {
277         // Unused
278     }
279 
280     public void beforeRemove(ChannelHandlerContext ctx) throws Exception {
281         // Unused
282     }
283 
284     public void afterRemove(ChannelHandlerContext ctx) throws Exception {
285         // Unused
286     }
287 }