1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.handler.codec.compression;
17
18 import org.jboss.netty.buffer.ChannelBuffer;
19 import org.jboss.netty.buffer.ChannelBuffers;
20 import org.jboss.netty.channel.Channel;
21 import org.jboss.netty.channel.ChannelEvent;
22 import org.jboss.netty.channel.ChannelFuture;
23 import org.jboss.netty.channel.ChannelFutureListener;
24 import org.jboss.netty.channel.ChannelHandlerContext;
25 import org.jboss.netty.channel.ChannelStateEvent;
26 import org.jboss.netty.channel.Channels;
27 import org.jboss.netty.channel.LifeCycleAwareChannelHandler;
28 import org.jboss.netty.handler.codec.oneone.OneToOneStrictEncoder;
29 import org.jboss.netty.util.internal.jzlib.JZlib;
30 import org.jboss.netty.util.internal.jzlib.ZStream;
31
32 import java.util.concurrent.atomic.AtomicBoolean;
33
34
35
36
37
38
39
40 public class ZlibEncoder extends OneToOneStrictEncoder implements LifeCycleAwareChannelHandler {
41
42 private static final byte[] EMPTY_ARRAY = new byte[0];
43
44 private final int wrapperOverhead;
45 private final ZStream z = new ZStream();
46 private final AtomicBoolean finished = new AtomicBoolean();
47 private volatile ChannelHandlerContext ctx;
48
49
50
51
52
53
54
55
56 public ZlibEncoder() {
57 this(6);
58 }
59
60
61
62
63
64
65
66
67
68
69
70
71
72 public ZlibEncoder(int compressionLevel) {
73 this(ZlibWrapper.ZLIB, compressionLevel);
74 }
75
76
77
78
79
80
81
82
83 public ZlibEncoder(ZlibWrapper wrapper) {
84 this(wrapper, 6);
85 }
86
87
88
89
90
91
92
93
94
95
96
97
98
99 public ZlibEncoder(ZlibWrapper wrapper, int compressionLevel) {
100 this(wrapper, compressionLevel, 15, 8);
101 }
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125 public ZlibEncoder(ZlibWrapper wrapper, int compressionLevel, int windowBits, int memLevel) {
126 if (compressionLevel < 0 || compressionLevel > 9) {
127 throw new IllegalArgumentException(
128 "compressionLevel: " + compressionLevel + " (expected: 0-9)");
129 }
130 if (windowBits < 9 || windowBits > 15) {
131 throw new IllegalArgumentException(
132 "windowBits: " + windowBits + " (expected: 9-15)");
133 }
134 if (memLevel < 1 || memLevel > 9) {
135 throw new IllegalArgumentException(
136 "memLevel: " + memLevel + " (expected: 1-9)");
137 }
138 if (wrapper == null) {
139 throw new NullPointerException("wrapper");
140 }
141 if (wrapper == ZlibWrapper.ZLIB_OR_NONE) {
142 throw new IllegalArgumentException(
143 "wrapper '" + ZlibWrapper.ZLIB_OR_NONE + "' is not " +
144 "allowed for compression.");
145 }
146
147 wrapperOverhead = ZlibUtil.wrapperOverhead(wrapper);
148
149 synchronized (z) {
150 int resultCode = z.deflateInit(compressionLevel, windowBits, memLevel,
151 ZlibUtil.convertWrapperType(wrapper));
152 if (resultCode != JZlib.Z_OK) {
153 ZlibUtil.fail(z, "initialization failure", resultCode);
154 }
155 }
156 }
157
158
159
160
161
162
163
164
165
166
167
168
169 public ZlibEncoder(byte[] dictionary) {
170 this(6, dictionary);
171 }
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188 public ZlibEncoder(int compressionLevel, byte[] dictionary) {
189 this(compressionLevel, 15, 8, dictionary);
190 }
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217 public ZlibEncoder(int compressionLevel, int windowBits, int memLevel, byte[] dictionary) {
218 if (compressionLevel < 0 || compressionLevel > 9) {
219 throw new IllegalArgumentException(
220 "compressionLevel: " + compressionLevel + " (expected: 0-9)");
221 }
222 if (windowBits < 9 || windowBits > 15) {
223 throw new IllegalArgumentException(
224 "windowBits: " + windowBits + " (expected: 9-15)");
225 }
226 if (memLevel < 1 || memLevel > 9) {
227 throw new IllegalArgumentException(
228 "memLevel: " + memLevel + " (expected: 1-9)");
229 }
230 if (dictionary == null) {
231 throw new NullPointerException("dictionary");
232 }
233
234 wrapperOverhead = ZlibUtil.wrapperOverhead(ZlibWrapper.ZLIB);
235
236 synchronized (z) {
237 int resultCode;
238 resultCode = z.deflateInit(compressionLevel, windowBits, memLevel,
239 JZlib.W_ZLIB);
240 if (resultCode != JZlib.Z_OK) {
241 ZlibUtil.fail(z, "initialization failure", resultCode);
242 } else {
243 resultCode = z.deflateSetDictionary(dictionary, dictionary.length);
244 if (resultCode != JZlib.Z_OK) {
245 ZlibUtil.fail(z, "failed to set the dictionary", resultCode);
246 }
247 }
248 }
249 }
250
251 public ChannelFuture close() {
252 ChannelHandlerContext ctx = this.ctx;
253 if (ctx == null) {
254 throw new IllegalStateException("not added to a pipeline");
255 }
256 return finishEncode(ctx, null);
257 }
258
259 public boolean isClosed() {
260 return finished.get();
261 }
262
263 @Override
264 protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception {
265 if (!(msg instanceof ChannelBuffer) || finished.get()) {
266 return msg;
267 }
268
269 final ChannelBuffer result;
270 synchronized (z) {
271 try {
272
273 final ChannelBuffer uncompressed = (ChannelBuffer) msg;
274 final int uncompressedLen = uncompressed.readableBytes();
275 if (uncompressedLen == 0) {
276 return uncompressed;
277 }
278
279 final byte[] in = new byte[uncompressedLen];
280 uncompressed.readBytes(in);
281 z.next_in = in;
282 z.next_in_index = 0;
283 z.avail_in = uncompressedLen;
284
285
286 final byte[] out = new byte[(int) Math.ceil(uncompressedLen * 1.001) + 12 + wrapperOverhead];
287 z.next_out = out;
288 z.next_out_index = 0;
289 z.avail_out = out.length;
290
291
292 final int resultCode = z.deflate(JZlib.Z_SYNC_FLUSH);
293 if (resultCode != JZlib.Z_OK) {
294 ZlibUtil.fail(z, "compression failure", resultCode);
295 }
296
297 if (z.next_out_index != 0) {
298 result = ctx.getChannel().getConfig().getBufferFactory().getBuffer(
299 uncompressed.order(), out, 0, z.next_out_index);
300 } else {
301 result = ChannelBuffers.EMPTY_BUFFER;
302 }
303 } finally {
304
305
306
307
308 z.next_in = null;
309 z.next_out = null;
310 }
311 }
312
313 return result;
314 }
315
316 @Override
317 public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent evt)
318 throws Exception {
319 if (evt instanceof ChannelStateEvent) {
320 ChannelStateEvent e = (ChannelStateEvent) evt;
321 switch (e.getState()) {
322 case OPEN:
323 case CONNECTED:
324 case BOUND:
325 if (Boolean.FALSE.equals(e.getValue()) || e.getValue() == null) {
326 finishEncode(ctx, evt);
327 return;
328 }
329 }
330 }
331
332 super.handleDownstream(ctx, evt);
333 }
334
335 private ChannelFuture finishEncode(final ChannelHandlerContext ctx, final ChannelEvent evt) {
336 if (!finished.compareAndSet(false, true)) {
337 if (evt != null) {
338 ctx.sendDownstream(evt);
339 }
340 return Channels.succeededFuture(ctx.getChannel());
341 }
342
343 ChannelBuffer footer;
344 ChannelFuture future;
345 synchronized (z) {
346 try {
347
348 z.next_in = EMPTY_ARRAY;
349 z.next_in_index = 0;
350 z.avail_in = 0;
351
352
353 byte[] out = new byte[32];
354 z.next_out = out;
355 z.next_out_index = 0;
356 z.avail_out = out.length;
357
358
359 int resultCode = z.deflate(JZlib.Z_FINISH);
360 if (resultCode != JZlib.Z_OK && resultCode != JZlib.Z_STREAM_END) {
361 future = Channels.failedFuture(
362 ctx.getChannel(),
363 ZlibUtil.exception(z, "compression failure", resultCode));
364 footer = null;
365 } else if (z.next_out_index != 0) {
366 future = Channels.future(ctx.getChannel());
367 footer =
368 ctx.getChannel().getConfig().getBufferFactory().getBuffer(
369 out, 0, z.next_out_index);
370 } else {
371
372
373
374 future = Channels.future(ctx.getChannel());
375 footer = ChannelBuffers.EMPTY_BUFFER;
376 }
377 } finally {
378 z.deflateEnd();
379
380
381
382
383
384 z.next_in = null;
385 z.next_out = null;
386 }
387 }
388
389 if (footer != null) {
390 Channels.write(ctx, future, footer);
391 }
392
393 if (evt != null) {
394 future.addListener(new ChannelFutureListener() {
395 public void operationComplete(ChannelFuture future) throws Exception {
396 ctx.sendDownstream(evt);
397 }
398 });
399 }
400
401 return future;
402 }
403
404 public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
405 this.ctx = ctx;
406 }
407
408 public void afterAdd(ChannelHandlerContext ctx) throws Exception {
409
410 }
411
412 public void beforeRemove(ChannelHandlerContext ctx) throws Exception {
413
414 }
415
416 public void afterRemove(ChannelHandlerContext ctx) throws Exception {
417
418 }
419 }