1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.handler.codec.compression;
17
18 import java.util.concurrent.atomic.AtomicBoolean;
19
20 import org.jboss.netty.buffer.ChannelBuffer;
21 import org.jboss.netty.buffer.ChannelBuffers;
22 import org.jboss.netty.channel.Channel;
23 import org.jboss.netty.channel.ChannelEvent;
24 import org.jboss.netty.channel.ChannelFuture;
25 import org.jboss.netty.channel.ChannelFutureListener;
26 import org.jboss.netty.channel.ChannelHandlerContext;
27 import org.jboss.netty.channel.ChannelStateEvent;
28 import org.jboss.netty.channel.Channels;
29 import org.jboss.netty.channel.LifeCycleAwareChannelHandler;
30 import org.jboss.netty.handler.codec.oneone.OneToOneStrictEncoder;
31 import org.jboss.netty.util.internal.jzlib.JZlib;
32 import org.jboss.netty.util.internal.jzlib.ZStream;
33
34
35
36
37
38
39
40 public class ZlibEncoder extends OneToOneStrictEncoder implements LifeCycleAwareChannelHandler {
41
42 private static final byte[] EMPTY_ARRAY = new byte[0];
43
44 private final ZStream z = new ZStream();
45 private final AtomicBoolean finished = new AtomicBoolean();
46 private volatile ChannelHandlerContext ctx;
47
48
49
50
51
52
53
54
55 public ZlibEncoder() {
56 this(6);
57 }
58
59
60
61
62
63
64
65
66
67
68
69
70
71 public ZlibEncoder(int compressionLevel) {
72 this(ZlibWrapper.ZLIB, compressionLevel);
73 }
74
75
76
77
78
79
80
81
82 public ZlibEncoder(ZlibWrapper wrapper) {
83 this(wrapper, 6);
84 }
85
86
87
88
89
90
91
92
93
94
95
96
97
98 public ZlibEncoder(ZlibWrapper wrapper, int compressionLevel) {
99 this(wrapper, compressionLevel, 15, 8);
100 }
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124 public ZlibEncoder(ZlibWrapper wrapper, int compressionLevel, int windowBits, int memLevel) {
125 if (compressionLevel < 0 || compressionLevel > 9) {
126 throw new IllegalArgumentException(
127 "compressionLevel: " + compressionLevel + " (expected: 0-9)");
128 }
129 if (windowBits < 9 || windowBits > 15) {
130 throw new IllegalArgumentException(
131 "windowBits: " + windowBits + " (expected: 9-15)");
132 }
133 if (memLevel < 1 || memLevel > 9) {
134 throw new IllegalArgumentException(
135 "memLevel: " + memLevel + " (expected: 1-9)");
136 }
137 if (wrapper == null) {
138 throw new NullPointerException("wrapper");
139 }
140 if (wrapper == ZlibWrapper.ZLIB_OR_NONE) {
141 throw new IllegalArgumentException(
142 "wrapper '" + ZlibWrapper.ZLIB_OR_NONE + "' is not " +
143 "allowed for compression.");
144 }
145
146 synchronized (z) {
147 int resultCode = z.deflateInit(compressionLevel, windowBits, memLevel,
148 ZlibUtil.convertWrapperType(wrapper));
149 if (resultCode != JZlib.Z_OK) {
150 ZlibUtil.fail(z, "initialization failure", resultCode);
151 }
152 }
153 }
154
155
156
157
158
159
160
161
162
163
164
165
166 public ZlibEncoder(byte[] dictionary) {
167 this(6, dictionary);
168 }
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185 public ZlibEncoder(int compressionLevel, byte[] dictionary) {
186 this(compressionLevel, 15, 8, dictionary);
187 }
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214 public ZlibEncoder(int compressionLevel, int windowBits, int memLevel, byte[] dictionary) {
215 if (compressionLevel < 0 || compressionLevel > 9) {
216 throw new IllegalArgumentException(
217 "compressionLevel: " + compressionLevel + " (expected: 0-9)");
218 }
219 if (windowBits < 9 || windowBits > 15) {
220 throw new IllegalArgumentException(
221 "windowBits: " + windowBits + " (expected: 9-15)");
222 }
223 if (memLevel < 1 || memLevel > 9) {
224 throw new IllegalArgumentException(
225 "memLevel: " + memLevel + " (expected: 1-9)");
226 }
227 if (dictionary == null) {
228 throw new NullPointerException("dictionary");
229 }
230
231 synchronized (z) {
232 int resultCode;
233 resultCode = z.deflateInit(compressionLevel, windowBits, memLevel,
234 JZlib.W_ZLIB);
235 if (resultCode != JZlib.Z_OK) {
236 ZlibUtil.fail(z, "initialization failure", resultCode);
237 } else {
238 resultCode = z.deflateSetDictionary(dictionary, dictionary.length);
239 if (resultCode != JZlib.Z_OK) {
240 ZlibUtil.fail(z, "failed to set the dictionary", resultCode);
241 }
242 }
243 }
244 }
245
246 public ChannelFuture close() {
247 ChannelHandlerContext ctx = this.ctx;
248 if (ctx == null) {
249 throw new IllegalStateException("not added to a pipeline");
250 }
251 return finishEncode(ctx, null);
252 }
253
254 public boolean isClosed() {
255 return finished.get();
256 }
257
258 @Override
259 protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception {
260 if (!(msg instanceof ChannelBuffer) || finished.get()) {
261 return msg;
262 }
263
264 ChannelBuffer result;
265 synchronized (z) {
266 try {
267
268 ChannelBuffer uncompressed = (ChannelBuffer) msg;
269 byte[] in = new byte[uncompressed.readableBytes()];
270 uncompressed.readBytes(in);
271 z.next_in = in;
272 z.next_in_index = 0;
273 z.avail_in = in.length;
274
275
276 byte[] out = new byte[(int) Math.ceil(in.length * 1.001) + 12];
277 z.next_out = out;
278 z.next_out_index = 0;
279 z.avail_out = out.length;
280
281
282 int resultCode = z.deflate(JZlib.Z_SYNC_FLUSH);
283 if (resultCode != JZlib.Z_OK) {
284 ZlibUtil.fail(z, "compression failure", resultCode);
285 }
286
287 if (z.next_out_index != 0) {
288 result = ctx.getChannel().getConfig().getBufferFactory().getBuffer(
289 uncompressed.order(), out, 0, z.next_out_index);
290 } else {
291 result = ChannelBuffers.EMPTY_BUFFER;
292 }
293 } finally {
294
295
296
297
298 z.next_in = null;
299 z.next_out = null;
300 }
301 }
302
303 return result;
304 }
305
306 @Override
307 public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent evt)
308 throws Exception {
309 if (evt instanceof ChannelStateEvent) {
310 ChannelStateEvent e = (ChannelStateEvent) evt;
311 switch (e.getState()) {
312 case OPEN:
313 case CONNECTED:
314 case BOUND:
315 if (Boolean.FALSE.equals(e.getValue()) || e.getValue() == null) {
316 finishEncode(ctx, evt);
317 return;
318 }
319 }
320 }
321
322 super.handleDownstream(ctx, evt);
323 }
324
325 private ChannelFuture finishEncode(final ChannelHandlerContext ctx, final ChannelEvent evt) {
326 if (!finished.compareAndSet(false, true)) {
327 if (evt != null) {
328 ctx.sendDownstream(evt);
329 }
330 return Channels.succeededFuture(ctx.getChannel());
331 }
332
333 ChannelBuffer footer;
334 ChannelFuture future;
335 synchronized (z) {
336 try {
337
338 z.next_in = EMPTY_ARRAY;
339 z.next_in_index = 0;
340 z.avail_in = 0;
341
342
343 byte[] out = new byte[32];
344 z.next_out = out;
345 z.next_out_index = 0;
346 z.avail_out = out.length;
347
348
349 int resultCode = z.deflate(JZlib.Z_FINISH);
350 if (resultCode != JZlib.Z_OK && resultCode != JZlib.Z_STREAM_END) {
351 future = Channels.failedFuture(
352 ctx.getChannel(),
353 ZlibUtil.exception(z, "compression failure", resultCode));
354 footer = null;
355 } else if (z.next_out_index != 0) {
356 future = Channels.future(ctx.getChannel());
357 footer =
358 ctx.getChannel().getConfig().getBufferFactory().getBuffer(
359 out, 0, z.next_out_index);
360 } else {
361
362
363
364 future = Channels.future(ctx.getChannel());
365 footer = ChannelBuffers.EMPTY_BUFFER;
366 }
367 } finally {
368 z.deflateEnd();
369
370
371
372
373
374 z.next_in = null;
375 z.next_out = null;
376 }
377 }
378
379 if (footer != null) {
380 Channels.write(ctx, future, footer);
381 }
382
383 if (evt != null) {
384 future.addListener(new ChannelFutureListener() {
385 public void operationComplete(ChannelFuture future) throws Exception {
386 ctx.sendDownstream(evt);
387 }
388 });
389 }
390
391 return future;
392 }
393
394 public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
395 this.ctx = ctx;
396 }
397
398 public void afterAdd(ChannelHandlerContext ctx) throws Exception {
399
400 }
401
402 public void beforeRemove(ChannelHandlerContext ctx) throws Exception {
403
404 }
405
406 public void afterRemove(ChannelHandlerContext ctx) throws Exception {
407
408 }
409 }