1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.codec.compression;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.channel.ChannelFuture;
20 import io.netty.channel.ChannelFutureListener;
21 import io.netty.channel.ChannelHandlerContext;
22 import io.netty.channel.ChannelPromise;
23 import io.netty.channel.ChannelPromiseNotifier;
24 import io.netty.util.concurrent.EventExecutor;
25
26 import java.util.concurrent.TimeUnit;
27 import java.util.zip.CRC32;
28 import java.util.zip.Deflater;
29
30
31
32
33 public class JdkZlibEncoder extends ZlibEncoder {
34
35 private final ZlibWrapper wrapper;
36 private final Deflater deflater;
37 private volatile boolean finished;
38 private volatile ChannelHandlerContext ctx;
39
40
41
42
43 private final CRC32 crc = new CRC32();
44 private static final byte[] gzipHeader = {0x1f, (byte) 0x8b, Deflater.DEFLATED, 0, 0, 0, 0, 0, 0, 0};
45 private boolean writeHeader = true;
46
47
48
49
50
51
52
53 public JdkZlibEncoder() {
54 this(6);
55 }
56
57
58
59
60
61
62
63
64
65
66
67
68 public JdkZlibEncoder(int compressionLevel) {
69 this(ZlibWrapper.ZLIB, compressionLevel);
70 }
71
72
73
74
75
76
77
78 public JdkZlibEncoder(ZlibWrapper wrapper) {
79 this(wrapper, 6);
80 }
81
82
83
84
85
86
87
88
89
90
91
92
93 public JdkZlibEncoder(ZlibWrapper wrapper, int compressionLevel) {
94 if (compressionLevel < 0 || compressionLevel > 9) {
95 throw new IllegalArgumentException(
96 "compressionLevel: " + compressionLevel + " (expected: 0-9)");
97 }
98 if (wrapper == null) {
99 throw new NullPointerException("wrapper");
100 }
101 if (wrapper == ZlibWrapper.ZLIB_OR_NONE) {
102 throw new IllegalArgumentException(
103 "wrapper '" + ZlibWrapper.ZLIB_OR_NONE + "' is not " +
104 "allowed for compression.");
105 }
106
107 this.wrapper = wrapper;
108 deflater = new Deflater(compressionLevel, wrapper != ZlibWrapper.ZLIB);
109 }
110
111
112
113
114
115
116
117
118
119
120
121 public JdkZlibEncoder(byte[] dictionary) {
122 this(6, dictionary);
123 }
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139 public JdkZlibEncoder(int compressionLevel, byte[] dictionary) {
140 if (compressionLevel < 0 || compressionLevel > 9) {
141 throw new IllegalArgumentException(
142 "compressionLevel: " + compressionLevel + " (expected: 0-9)");
143 }
144 if (dictionary == null) {
145 throw new NullPointerException("dictionary");
146 }
147
148 wrapper = ZlibWrapper.ZLIB;
149 deflater = new Deflater(compressionLevel);
150 deflater.setDictionary(dictionary);
151 }
152
153 @Override
154 public ChannelFuture close() {
155 return close(ctx().newPromise());
156 }
157
158 @Override
159 public ChannelFuture close(final ChannelPromise promise) {
160 ChannelHandlerContext ctx = ctx();
161 EventExecutor executor = ctx.executor();
162 if (executor.inEventLoop()) {
163 return finishEncode(ctx, promise);
164 } else {
165 final ChannelPromise p = ctx.newPromise();
166 executor.execute(new Runnable() {
167 @Override
168 public void run() {
169 ChannelFuture f = finishEncode(ctx(), p);
170 f.addListener(new ChannelPromiseNotifier(promise));
171 }
172 });
173 return p;
174 }
175 }
176
177 private ChannelHandlerContext ctx() {
178 ChannelHandlerContext ctx = this.ctx;
179 if (ctx == null) {
180 throw new IllegalStateException("not added to a pipeline");
181 }
182 return ctx;
183 }
184
185 @Override
186 public boolean isClosed() {
187 return finished;
188 }
189
190 @Override
191 protected void encode(ChannelHandlerContext ctx, ByteBuf uncompressed, ByteBuf out) throws Exception {
192 if (finished) {
193 out.writeBytes(uncompressed);
194 return;
195 }
196
197 int len = uncompressed.readableBytes();
198 if (len == 0) {
199 return;
200 }
201
202 int offset;
203 byte[] inAry;
204 if (uncompressed.hasArray()) {
205
206 inAry = uncompressed.array();
207 offset = uncompressed.arrayOffset() + uncompressed.readerIndex();
208
209 uncompressed.skipBytes(len);
210 } else {
211 inAry = new byte[len];
212 uncompressed.readBytes(inAry);
213 offset = 0;
214 }
215
216 if (writeHeader) {
217 writeHeader = false;
218 if (wrapper == ZlibWrapper.GZIP) {
219 out.writeBytes(gzipHeader);
220 }
221 }
222
223 if (wrapper == ZlibWrapper.GZIP) {
224 crc.update(inAry, offset, len);
225 }
226
227 deflater.setInput(inAry, offset, len);
228 while (!deflater.needsInput()) {
229 deflate(out);
230 }
231 }
232
233 @Override
234 protected final ByteBuf allocateBuffer(ChannelHandlerContext ctx, ByteBuf msg,
235 boolean preferDirect) throws Exception {
236 int sizeEstimate = (int) Math.ceil(msg.readableBytes() * 1.001) + 12;
237 if (writeHeader) {
238 switch (wrapper) {
239 case GZIP:
240 sizeEstimate += gzipHeader.length;
241 break;
242 case ZLIB:
243 sizeEstimate += 2;
244 break;
245 default:
246
247 }
248 }
249 return ctx.alloc().heapBuffer(sizeEstimate);
250 }
251
252 @Override
253 public void close(final ChannelHandlerContext ctx, final ChannelPromise promise) throws Exception {
254 ChannelFuture f = finishEncode(ctx, ctx.newPromise());
255 f.addListener(new ChannelFutureListener() {
256 @Override
257 public void operationComplete(ChannelFuture f) throws Exception {
258 ctx.close(promise);
259 }
260 });
261
262 if (!f.isDone()) {
263
264 ctx.executor().schedule(new Runnable() {
265 @Override
266 public void run() {
267 ctx.close(promise);
268 }
269 }, 10, TimeUnit.SECONDS);
270 }
271 }
272
273 private ChannelFuture finishEncode(final ChannelHandlerContext ctx, ChannelPromise promise) {
274 if (finished) {
275 promise.setSuccess();
276 return promise;
277 }
278
279 finished = true;
280 ByteBuf footer = ctx.alloc().heapBuffer();
281 if (writeHeader && wrapper == ZlibWrapper.GZIP) {
282
283 writeHeader = false;
284 footer.writeBytes(gzipHeader);
285 }
286
287 deflater.finish();
288
289 while (!deflater.finished()) {
290 deflate(footer);
291 if (!footer.isWritable()) {
292
293 ctx.write(footer);
294 footer = ctx.alloc().heapBuffer();
295 }
296 }
297 if (wrapper == ZlibWrapper.GZIP) {
298 int crcValue = (int) crc.getValue();
299 int uncBytes = deflater.getTotalIn();
300 footer.writeByte(crcValue);
301 footer.writeByte(crcValue >>> 8);
302 footer.writeByte(crcValue >>> 16);
303 footer.writeByte(crcValue >>> 24);
304 footer.writeByte(uncBytes);
305 footer.writeByte(uncBytes >>> 8);
306 footer.writeByte(uncBytes >>> 16);
307 footer.writeByte(uncBytes >>> 24);
308 }
309 deflater.end();
310 return ctx.writeAndFlush(footer, promise);
311 }
312
313 private void deflate(ByteBuf out) {
314 int numBytes;
315 do {
316 int writerIndex = out.writerIndex();
317 numBytes = deflater.deflate(
318 out.array(), out.arrayOffset() + writerIndex, out.writableBytes(), Deflater.SYNC_FLUSH);
319 out.writerIndex(writerIndex + numBytes);
320 } while (numBytes > 0);
321 }
322
323 @Override
324 public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
325 this.ctx = ctx;
326 }
327 }