1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.handler.codec.compression;
17
18 import java.util.concurrent.atomic.AtomicBoolean;
19 import java.util.zip.CRC32;
20 import java.util.zip.Deflater;
21
22 import org.jboss.netty.buffer.ChannelBuffer;
23 import org.jboss.netty.buffer.ChannelBuffers;
24 import org.jboss.netty.channel.Channel;
25 import org.jboss.netty.channel.ChannelEvent;
26 import org.jboss.netty.channel.ChannelFuture;
27 import org.jboss.netty.channel.ChannelFutureListener;
28 import org.jboss.netty.channel.ChannelHandlerContext;
29 import org.jboss.netty.channel.ChannelStateEvent;
30 import org.jboss.netty.channel.Channels;
31 import org.jboss.netty.channel.LifeCycleAwareChannelHandler;
32 import org.jboss.netty.handler.codec.oneone.OneToOneStrictEncoder;
33
34
35
36
37
38
39
40 public class JdkZlibEncoder extends OneToOneStrictEncoder implements LifeCycleAwareChannelHandler {
41
42 private final byte[] out = new byte[8192];
43 private final Deflater deflater;
44 private final AtomicBoolean finished = new AtomicBoolean();
45 private volatile ChannelHandlerContext ctx;
46
47
48
49
50 private final boolean gzip;
51 private final CRC32 crc = new CRC32();
52 private static final byte[] gzipHeader = {0x1f, (byte) 0x8b, Deflater.DEFLATED, 0, 0, 0, 0, 0, 0, 0};
53 private boolean writeHeader = true;
54
55
56
57
58
59
60
61 public JdkZlibEncoder() {
62 this(6);
63 }
64
65
66
67
68
69
70
71
72
73
74
75
76 public JdkZlibEncoder(int compressionLevel) {
77 this(ZlibWrapper.ZLIB, compressionLevel);
78 }
79
80
81
82
83
84
85
86 public JdkZlibEncoder(ZlibWrapper wrapper) {
87 this(wrapper, 6);
88 }
89
90
91
92
93
94
95
96
97
98
99
100
101 public JdkZlibEncoder(ZlibWrapper wrapper, int compressionLevel) {
102 if (compressionLevel < 0 || compressionLevel > 9) {
103 throw new IllegalArgumentException(
104 "compressionLevel: " + compressionLevel + " (expected: 0-9)");
105 }
106 if (wrapper == null) {
107 throw new NullPointerException("wrapper");
108 }
109 if (wrapper == ZlibWrapper.ZLIB_OR_NONE) {
110 throw new IllegalArgumentException(
111 "wrapper '" + ZlibWrapper.ZLIB_OR_NONE + "' is not " +
112 "allowed for compression.");
113 }
114
115 gzip = wrapper == ZlibWrapper.GZIP;
116 deflater = new Deflater(compressionLevel, wrapper != ZlibWrapper.ZLIB);
117 }
118
119
120
121
122
123
124
125
126
127
128
129 public JdkZlibEncoder(byte[] dictionary) {
130 this(6, dictionary);
131 }
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147 public JdkZlibEncoder(int compressionLevel, byte[] dictionary) {
148 if (compressionLevel < 0 || compressionLevel > 9) {
149 throw new IllegalArgumentException(
150 "compressionLevel: " + compressionLevel + " (expected: 0-9)");
151 }
152 if (dictionary == null) {
153 throw new NullPointerException("dictionary");
154 }
155
156 gzip = false;
157 deflater = new Deflater(compressionLevel);
158 deflater.setDictionary(dictionary);
159 }
160
161 public ChannelFuture close() {
162 ChannelHandlerContext ctx = this.ctx;
163 if (ctx == null) {
164 throw new IllegalStateException("not added to a pipeline");
165 }
166 return finishEncode(ctx, null);
167 }
168
169 public boolean isClosed() {
170 return finished.get();
171 }
172
173 @Override
174 protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception {
175 if (!(msg instanceof ChannelBuffer) || finished.get()) {
176 return msg;
177 }
178
179 ChannelBuffer uncompressed = (ChannelBuffer) msg;
180 byte[] in = new byte[uncompressed.readableBytes()];
181 uncompressed.readBytes(in);
182
183 int sizeEstimate = (int) Math.ceil(in.length * 1.001) + 12;
184 ChannelBuffer compressed = ChannelBuffers.dynamicBuffer(sizeEstimate, channel.getConfig().getBufferFactory());
185
186 synchronized (deflater) {
187 if (gzip) {
188 crc.update(in);
189 if (writeHeader) {
190 compressed.writeBytes(gzipHeader);
191 writeHeader = false;
192 }
193 }
194
195 deflater.setInput(in);
196 while (!deflater.needsInput()) {
197 int numBytes = deflater.deflate(out, 0, out.length, Deflater.SYNC_FLUSH);
198 compressed.writeBytes(out, 0, numBytes);
199 }
200 }
201
202 return compressed;
203 }
204
205 @Override
206 public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent evt)
207 throws Exception {
208 if (evt instanceof ChannelStateEvent) {
209 ChannelStateEvent e = (ChannelStateEvent) evt;
210 switch (e.getState()) {
211 case OPEN:
212 case CONNECTED:
213 case BOUND:
214 if (Boolean.FALSE.equals(e.getValue()) || e.getValue() == null) {
215 finishEncode(ctx, evt);
216 return;
217 }
218 }
219 }
220
221 super.handleDownstream(ctx, evt);
222 }
223
224 private ChannelFuture finishEncode(final ChannelHandlerContext ctx, final ChannelEvent evt) {
225 ChannelFuture future = Channels.succeededFuture(ctx.getChannel());
226
227 if (!finished.compareAndSet(false, true)) {
228 if (evt != null) {
229 ctx.sendDownstream(evt);
230 }
231 return future;
232 }
233
234 ChannelBuffer footer = ChannelBuffers.dynamicBuffer(ctx.getChannel().getConfig().getBufferFactory());
235 synchronized (deflater) {
236 deflater.finish();
237 while (!deflater.finished()) {
238 int numBytes = deflater.deflate(out, 0, out.length);
239 footer.writeBytes(out, 0, numBytes);
240 }
241 if (gzip) {
242 int crcValue = (int) crc.getValue();
243 int uncBytes = deflater.getTotalIn();
244 footer.writeByte(crcValue);
245 footer.writeByte(crcValue >>> 8);
246 footer.writeByte(crcValue >>> 16);
247 footer.writeByte(crcValue >>> 24);
248 footer.writeByte(uncBytes);
249 footer.writeByte(uncBytes >>> 8);
250 footer.writeByte(uncBytes >>> 16);
251 footer.writeByte(uncBytes >>> 24);
252 }
253 deflater.end();
254 }
255
256 if (footer.readable()) {
257 future = Channels.future(ctx.getChannel());
258 Channels.write(ctx, future, footer);
259 }
260
261 if (evt != null) {
262 future.addListener(new ChannelFutureListener() {
263 public void operationComplete(ChannelFuture future) throws Exception {
264 ctx.sendDownstream(evt);
265 }
266 });
267 }
268
269 return future;
270 }
271
272 public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
273 this.ctx = ctx;
274 }
275
276 public void afterAdd(ChannelHandlerContext ctx) throws Exception {
277
278 }
279
280 public void beforeRemove(ChannelHandlerContext ctx) throws Exception {
281
282 }
283
284 public void afterRemove(ChannelHandlerContext ctx) throws Exception {
285
286 }
287 }