1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.codec.compression;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.channel.ChannelFuture;
20 import io.netty.channel.ChannelHandlerContext;
21 import io.netty.channel.ChannelPromise;
22 import io.netty.util.concurrent.EventExecutor;
23 import io.netty.util.concurrent.PromiseNotifier;
24 import io.netty.util.internal.EmptyArrays;
25 import io.netty.util.internal.ObjectUtil;
26 import io.netty.util.internal.SystemPropertyUtil;
27 import io.netty.util.internal.logging.InternalLogger;
28 import io.netty.util.internal.logging.InternalLoggerFactory;
29
30 import java.util.zip.CRC32;
31 import java.util.zip.Deflater;
32
33
34
35
36 public class JdkZlibEncoder extends ZlibEncoder {
37
38 private static final InternalLogger logger = InternalLoggerFactory.getInstance(JdkZlibEncoder.class);
39
40
41
42
43
44 private static final int MAX_INITIAL_OUTPUT_BUFFER_SIZE;
45
46
47
48 private static final int MAX_INPUT_BUFFER_SIZE;
49
50 private final ZlibWrapper wrapper;
51 private final Deflater deflater;
52 private volatile boolean finished;
53 private volatile ChannelHandlerContext ctx;
54
55
56
57
58 private final CRC32 crc;
59 private static final byte[] gzipHeader = {0x1f, (byte) 0x8b, Deflater.DEFLATED, 0, 0, 0, 0, 0, 0, 0};
60 private boolean writeHeader = true;
61
62 static {
63 MAX_INITIAL_OUTPUT_BUFFER_SIZE = SystemPropertyUtil.getInt(
64 "io.netty.jdkzlib.encoder.maxInitialOutputBufferSize",
65 65536);
66 MAX_INPUT_BUFFER_SIZE = SystemPropertyUtil.getInt(
67 "io.netty.jdkzlib.encoder.maxInputBufferSize",
68 65536);
69
70 if (logger.isDebugEnabled()) {
71 logger.debug("-Dio.netty.jdkzlib.encoder.maxInitialOutputBufferSize={}", MAX_INITIAL_OUTPUT_BUFFER_SIZE);
72 logger.debug("-Dio.netty.jdkzlib.encoder.maxInputBufferSize={}", MAX_INPUT_BUFFER_SIZE);
73 }
74 }
75
76
77
78
79
80
81
82 public JdkZlibEncoder() {
83 this(6);
84 }
85
86
87
88
89
90
91
92
93
94
95
96
97 public JdkZlibEncoder(int compressionLevel) {
98 this(ZlibWrapper.ZLIB, compressionLevel);
99 }
100
101
102
103
104
105
106
107 public JdkZlibEncoder(ZlibWrapper wrapper) {
108 this(wrapper, 6);
109 }
110
111
112
113
114
115
116
117
118
119
120
121
122
123 public JdkZlibEncoder(ZlibWrapper wrapper, int compressionLevel) {
124 ObjectUtil.checkInRange(compressionLevel, Deflater.DEFAULT_COMPRESSION, Deflater.BEST_COMPRESSION,
125 "compressionLevel");
126 ObjectUtil.checkNotNull(wrapper, "wrapper");
127
128 if (wrapper == ZlibWrapper.ZLIB_OR_NONE) {
129 throw new IllegalArgumentException(
130 "wrapper '" + ZlibWrapper.ZLIB_OR_NONE + "' is not " +
131 "allowed for compression.");
132 }
133
134 this.wrapper = wrapper;
135 deflater = new Deflater(compressionLevel, wrapper != ZlibWrapper.ZLIB);
136 this.crc = wrapper == ZlibWrapper.GZIP ? new CRC32() : null;
137 }
138
139
140
141
142
143
144
145
146
147
148
149 public JdkZlibEncoder(byte[] dictionary) {
150 this(6, dictionary);
151 }
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168 public JdkZlibEncoder(int compressionLevel, byte[] dictionary) {
169 ObjectUtil.checkInRange(compressionLevel, Deflater.DEFAULT_COMPRESSION, Deflater.BEST_COMPRESSION,
170 "compressionLevel");
171 ObjectUtil.checkNotNull(dictionary, "dictionary");
172
173 wrapper = ZlibWrapper.ZLIB;
174 deflater = new Deflater(compressionLevel);
175 deflater.setDictionary(dictionary);
176 crc = null;
177 }
178
179 @Override
180 public ChannelFuture close() {
181 return close(ctx().newPromise());
182 }
183
184 @Override
185 public ChannelFuture close(final ChannelPromise promise) {
186 ChannelHandlerContext ctx = ctx();
187 EventExecutor executor = ctx.executor();
188 if (executor.inEventLoop()) {
189 return finishEncode(ctx, promise);
190 } else {
191 final ChannelPromise p = ctx.newPromise();
192 executor.execute(() -> {
193 ChannelFuture f = finishEncode(ctx(), p);
194 PromiseNotifier.cascade(f, promise);
195 });
196 return p;
197 }
198 }
199
200 private ChannelHandlerContext ctx() {
201 ChannelHandlerContext ctx = this.ctx;
202 if (ctx == null) {
203 throw new IllegalStateException("not added to a pipeline");
204 }
205 return ctx;
206 }
207
208 @Override
209 public boolean isClosed() {
210 return finished;
211 }
212
213 @Override
214 protected void encode(ChannelHandlerContext ctx, ByteBuf uncompressed, ByteBuf out) throws Exception {
215 if (finished) {
216 out.writeBytes(uncompressed);
217 return;
218 }
219
220 int len = uncompressed.readableBytes();
221 if (len == 0) {
222 return;
223 }
224
225 if (uncompressed.hasArray()) {
226
227 encodeSome(uncompressed, out);
228 } else {
229 int heapBufferSize = Math.min(len, MAX_INPUT_BUFFER_SIZE);
230 ByteBuf heapBuf = ctx.alloc().heapBuffer(heapBufferSize, heapBufferSize);
231 try {
232 while (uncompressed.isReadable()) {
233 uncompressed.readBytes(heapBuf, Math.min(heapBuf.writableBytes(), uncompressed.readableBytes()));
234 encodeSome(heapBuf, out);
235 heapBuf.clear();
236 }
237 } finally {
238 heapBuf.release();
239 }
240 }
241
242 deflater.setInput(EmptyArrays.EMPTY_BYTES);
243 }
244
245 private void encodeSome(ByteBuf in, ByteBuf out) {
246
247
248 byte[] inAry = in.array();
249 int offset = in.arrayOffset() + in.readerIndex();
250
251 if (writeHeader) {
252 writeHeader = false;
253 if (wrapper == ZlibWrapper.GZIP) {
254 out.writeBytes(gzipHeader);
255 }
256 }
257
258 int len = in.readableBytes();
259 if (wrapper == ZlibWrapper.GZIP) {
260 crc.update(inAry, offset, len);
261 }
262
263 deflater.setInput(inAry, offset, len);
264 for (;;) {
265 deflate(out);
266 if (!out.isWritable()) {
267
268
269 out.ensureWritable(out.writerIndex());
270 } else if (deflater.needsInput()) {
271
272 break;
273 }
274 }
275 in.skipBytes(len);
276 }
277
278 @Override
279 protected final ByteBuf allocateBuffer(ChannelHandlerContext ctx, ByteBuf msg,
280 boolean preferDirect) throws Exception {
281 int sizeEstimate = (int) Math.ceil(msg.readableBytes() * 1.001) + 12;
282 if (writeHeader) {
283 switch (wrapper) {
284 case GZIP:
285 sizeEstimate += gzipHeader.length;
286 break;
287 case ZLIB:
288 sizeEstimate += 2;
289 break;
290 default:
291
292 }
293 }
294
295 if (sizeEstimate < 0 || sizeEstimate > MAX_INITIAL_OUTPUT_BUFFER_SIZE) {
296
297 return ctx.alloc().heapBuffer(MAX_INITIAL_OUTPUT_BUFFER_SIZE);
298 }
299 return ctx.alloc().heapBuffer(sizeEstimate);
300 }
301
302 @Override
303 public void close(final ChannelHandlerContext ctx, final ChannelPromise promise) throws Exception {
304 ChannelFuture f = finishEncode(ctx, ctx.newPromise());
305 EncoderUtil.closeAfterFinishEncode(ctx, f, promise);
306 }
307
308 private ChannelFuture finishEncode(final ChannelHandlerContext ctx, ChannelPromise promise) {
309 if (finished) {
310 promise.setSuccess();
311 return promise;
312 }
313
314 finished = true;
315 ByteBuf footer = ctx.alloc().heapBuffer();
316 if (writeHeader && wrapper == ZlibWrapper.GZIP) {
317
318 writeHeader = false;
319 footer.writeBytes(gzipHeader);
320 }
321
322 deflater.finish();
323
324 while (!deflater.finished()) {
325 deflate(footer);
326 if (!footer.isWritable()) {
327
328 ctx.write(footer);
329 footer = ctx.alloc().heapBuffer();
330 }
331 }
332 if (wrapper == ZlibWrapper.GZIP) {
333 int crcValue = (int) crc.getValue();
334 int uncBytes = deflater.getTotalIn();
335 footer.writeIntLE(crcValue);
336 footer.writeIntLE(uncBytes);
337 }
338 deflater.end();
339 return ctx.writeAndFlush(footer, promise);
340 }
341
342 private void deflate(ByteBuf out) {
343 int numBytes;
344 do {
345 int writerIndex = out.writerIndex();
346 numBytes = deflater.deflate(
347 out.array(), out.arrayOffset() + writerIndex, out.writableBytes(), Deflater.SYNC_FLUSH);
348 out.writerIndex(writerIndex + numBytes);
349 } while (numBytes > 0);
350 }
351
352 @Override
353 public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
354 this.ctx = ctx;
355 }
356 }