1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.handler.codec.compression;
17
18 import org.jboss.netty.buffer.ChannelBuffer;
19 import org.jboss.netty.buffer.ChannelBuffers;
20 import org.jboss.netty.channel.Channel;
21 import org.jboss.netty.channel.ChannelEvent;
22 import org.jboss.netty.channel.ChannelFuture;
23 import org.jboss.netty.channel.ChannelFutureListener;
24 import org.jboss.netty.channel.ChannelHandlerContext;
25 import org.jboss.netty.channel.ChannelStateEvent;
26 import org.jboss.netty.channel.Channels;
27 import org.jboss.netty.channel.LifeCycleAwareChannelHandler;
28 import org.jboss.netty.handler.codec.oneone.OneToOneStrictEncoder;
29
30 import java.util.concurrent.atomic.AtomicBoolean;
31 import java.util.zip.CRC32;
32 import java.util.zip.Deflater;
33
34
35
36
37
38
39
40 public class JdkZlibEncoder extends OneToOneStrictEncoder implements LifeCycleAwareChannelHandler {
41
42 private final ZlibWrapper wrapper;
43 private final Deflater deflater;
44 private final AtomicBoolean finished = new AtomicBoolean();
45 private volatile ChannelHandlerContext ctx;
46 private byte[] out;
47
48 private final CRC32 crc;
49 private static final byte[] gzipHeader = {0x1f, (byte) 0x8b, Deflater.DEFLATED, 0, 0, 0, 0, 0, 0, 0};
50 private boolean writeHeader = true;
51
52
53
54
55
56
57
58 public JdkZlibEncoder() {
59 this(6);
60 }
61
62
63
64
65
66
67
68
69
70
71
72
73 public JdkZlibEncoder(int compressionLevel) {
74 this(ZlibWrapper.ZLIB, compressionLevel);
75 }
76
77
78
79
80
81
82
83 public JdkZlibEncoder(ZlibWrapper wrapper) {
84 this(wrapper, 6);
85 }
86
87
88
89
90
91
92
93
94
95
96
97
98 public JdkZlibEncoder(ZlibWrapper wrapper, int compressionLevel) {
99 if (compressionLevel < 0 || compressionLevel > 9) {
100 throw new IllegalArgumentException(
101 "compressionLevel: " + compressionLevel + " (expected: 0-9)");
102 }
103 if (wrapper == null) {
104 throw new NullPointerException("wrapper");
105 }
106 if (wrapper == ZlibWrapper.ZLIB_OR_NONE) {
107 throw new IllegalArgumentException(
108 "wrapper '" + ZlibWrapper.ZLIB_OR_NONE + "' is not " +
109 "allowed for compression.");
110 }
111
112 this.wrapper = wrapper;
113 deflater = new Deflater(compressionLevel, wrapper != ZlibWrapper.ZLIB);
114 if (wrapper == ZlibWrapper.GZIP) {
115 crc = new CRC32();
116 } else {
117 crc = null;
118 }
119 }
120
121
122
123
124
125
126
127
128
129
130
131 public JdkZlibEncoder(byte[] dictionary) {
132 this(6, dictionary);
133 }
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149 public JdkZlibEncoder(int compressionLevel, byte[] dictionary) {
150 if (compressionLevel < 0 || compressionLevel > 9) {
151 throw new IllegalArgumentException(
152 "compressionLevel: " + compressionLevel + " (expected: 0-9)");
153 }
154 if (dictionary == null) {
155 throw new NullPointerException("dictionary");
156 }
157
158 wrapper = ZlibWrapper.ZLIB;
159 crc = null;
160 deflater = new Deflater(compressionLevel);
161 deflater.setDictionary(dictionary);
162 }
163
164 public ChannelFuture close() {
165 ChannelHandlerContext ctx = this.ctx;
166 if (ctx == null) {
167 throw new IllegalStateException("not added to a pipeline");
168 }
169 return finishEncode(ctx, null);
170 }
171
172 private boolean isGzip() {
173 return wrapper == ZlibWrapper.GZIP;
174 }
175
176 public boolean isClosed() {
177 return finished.get();
178 }
179
180 @Override
181 protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception {
182 if (!(msg instanceof ChannelBuffer) || finished.get()) {
183 return msg;
184 }
185
186 final ChannelBuffer uncompressed = (ChannelBuffer) msg;
187 final int uncompressedLen = uncompressed.readableBytes();
188 if (uncompressedLen == 0) {
189 return uncompressed;
190 }
191
192 final byte[] in = new byte[uncompressedLen];
193 uncompressed.readBytes(in);
194
195 final int sizeEstimate = estimateCompressedSize(uncompressedLen);
196 final ChannelBuffer compressed =
197 ChannelBuffers.dynamicBuffer(sizeEstimate, channel.getConfig().getBufferFactory());
198
199 synchronized (deflater) {
200 if (isGzip()) {
201 crc.update(in);
202 if (writeHeader) {
203 compressed.writeBytes(gzipHeader);
204 writeHeader = false;
205 }
206 }
207
208 deflater.setInput(in);
209 while (!deflater.needsInput()) {
210 deflate(compressed);
211 }
212 }
213
214 return compressed;
215 }
216
217 private int estimateCompressedSize(int originalSize) {
218 int sizeEstimate = (int) Math.ceil(originalSize * 1.001) + 12;
219 if (writeHeader) {
220 switch (wrapper) {
221 case GZIP:
222 sizeEstimate += gzipHeader.length;
223 break;
224 case ZLIB:
225 sizeEstimate += 2;
226 break;
227 }
228 }
229 return sizeEstimate;
230 }
231
232 @Override
233 public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent evt)
234 throws Exception {
235 if (evt instanceof ChannelStateEvent) {
236 ChannelStateEvent e = (ChannelStateEvent) evt;
237 switch (e.getState()) {
238 case OPEN:
239 case CONNECTED:
240 case BOUND:
241 if (Boolean.FALSE.equals(e.getValue()) || e.getValue() == null) {
242 finishEncode(ctx, evt);
243 return;
244 }
245 }
246 }
247
248 super.handleDownstream(ctx, evt);
249 }
250
251 private ChannelFuture finishEncode(final ChannelHandlerContext ctx, final ChannelEvent evt) {
252 ChannelFuture future = Channels.succeededFuture(ctx.getChannel());
253
254 if (!finished.compareAndSet(false, true)) {
255 if (evt != null) {
256 ctx.sendDownstream(evt);
257 }
258 return future;
259 }
260
261 final ChannelBuffer footer = ChannelBuffers.dynamicBuffer(ctx.getChannel().getConfig().getBufferFactory());
262 final boolean gzip = isGzip();
263 synchronized (deflater) {
264 if (gzip && writeHeader) {
265
266 writeHeader = false;
267 footer.writeBytes(gzipHeader);
268 }
269
270 deflater.finish();
271 while (!deflater.finished()) {
272 deflate(footer);
273 }
274 if (gzip) {
275 int crcValue = (int) crc.getValue();
276 int uncBytes = deflater.getTotalIn();
277 footer.writeByte(crcValue);
278 footer.writeByte(crcValue >>> 8);
279 footer.writeByte(crcValue >>> 16);
280 footer.writeByte(crcValue >>> 24);
281 footer.writeByte(uncBytes);
282 footer.writeByte(uncBytes >>> 8);
283 footer.writeByte(uncBytes >>> 16);
284 footer.writeByte(uncBytes >>> 24);
285 }
286 deflater.end();
287 }
288
289 if (footer.readable()) {
290 future = Channels.future(ctx.getChannel());
291 Channels.write(ctx, future, footer);
292 }
293
294 if (evt != null) {
295 future.addListener(new ChannelFutureListener() {
296 public void operationComplete(ChannelFuture future) throws Exception {
297 ctx.sendDownstream(evt);
298 }
299 });
300 }
301
302 return future;
303 }
304
305 private void deflate(ChannelBuffer out) {
306 int numBytes;
307 if (out.hasArray()) {
308 do {
309 int writerIndex = out.writerIndex();
310 numBytes = deflater.deflate(
311 out.array(), out.arrayOffset() + writerIndex, out.writableBytes(),
312 Deflater.SYNC_FLUSH);
313 out.writerIndex(writerIndex + numBytes);
314 } while (numBytes > 0);
315 } else {
316 byte[] tmpOut = this.out;
317 if (tmpOut == null) {
318 tmpOut = this.out = new byte[8192];
319 }
320
321 do {
322 numBytes = deflater.deflate(tmpOut, 0, tmpOut.length, Deflater.SYNC_FLUSH);
323 out.writeBytes(tmpOut, 0, numBytes);
324 } while (numBytes > 0);
325 }
326 }
327
328 public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
329 this.ctx = ctx;
330 }
331
332 public void afterAdd(ChannelHandlerContext ctx) throws Exception {
333
334 }
335
336 public void beforeRemove(ChannelHandlerContext ctx) throws Exception {
337
338 }
339
340 public void afterRemove(ChannelHandlerContext ctx) throws Exception {
341
342 }
343 }