1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package io.netty.handler.codec.http2;
16
17 import io.netty.buffer.ByteBuf;
18 import io.netty.buffer.Unpooled;
19 import io.netty.channel.ChannelFuture;
20 import io.netty.channel.ChannelHandlerContext;
21 import io.netty.channel.ChannelPromise;
22 import io.netty.channel.embedded.EmbeddedChannel;
23 import io.netty.handler.codec.ByteToMessageDecoder;
24 import io.netty.handler.codec.compression.BrotliEncoder;
25 import io.netty.handler.codec.compression.ZlibCodecFactory;
26 import io.netty.handler.codec.compression.ZlibWrapper;
27 import io.netty.handler.codec.compression.Brotli;
28 import io.netty.handler.codec.compression.BrotliOptions;
29 import io.netty.handler.codec.compression.CompressionOptions;
30 import io.netty.handler.codec.compression.DeflateOptions;
31 import io.netty.handler.codec.compression.GzipOptions;
32 import io.netty.handler.codec.compression.StandardCompressionOptions;
33 import io.netty.handler.codec.compression.Zstd;
34 import io.netty.handler.codec.compression.ZstdEncoder;
35 import io.netty.handler.codec.compression.ZstdOptions;
36 import io.netty.handler.codec.compression.SnappyFrameEncoder;
37 import io.netty.handler.codec.compression.SnappyOptions;
38 import io.netty.util.concurrent.PromiseCombiner;
39 import io.netty.util.internal.ObjectUtil;
40
41 import java.util.ArrayList;
42 import java.util.List;
43
44 import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_ENCODING;
45 import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_LENGTH;
46 import static io.netty.handler.codec.http.HttpHeaderValues.BR;
47 import static io.netty.handler.codec.http.HttpHeaderValues.DEFLATE;
48 import static io.netty.handler.codec.http.HttpHeaderValues.GZIP;
49 import static io.netty.handler.codec.http.HttpHeaderValues.IDENTITY;
50 import static io.netty.handler.codec.http.HttpHeaderValues.X_DEFLATE;
51 import static io.netty.handler.codec.http.HttpHeaderValues.X_GZIP;
52 import static io.netty.handler.codec.http.HttpHeaderValues.ZSTD;
53 import static io.netty.handler.codec.http.HttpHeaderValues.SNAPPY;
54
55
56
57
58
59 public class CompressorHttp2ConnectionEncoder extends DecoratingHttp2ConnectionEncoder {
60
61 public static final int DEFAULT_COMPRESSION_LEVEL = 6;
62 public static final int DEFAULT_WINDOW_BITS = 15;
63 public static final int DEFAULT_MEM_LEVEL = 8;
64
65 private int compressionLevel;
66 private int windowBits;
67 private int memLevel;
68 private final Http2Connection.PropertyKey propertyKey;
69
70 private final boolean supportsCompressionOptions;
71
72 private BrotliOptions brotliOptions;
73 private GzipOptions gzipCompressionOptions;
74 private DeflateOptions deflateOptions;
75 private ZstdOptions zstdOptions;
76 private SnappyOptions snappyOptions;
77
78
79
80
81
82 public CompressorHttp2ConnectionEncoder(Http2ConnectionEncoder delegate) {
83 this(delegate, defaultCompressionOptions());
84 }
85
86 private static CompressionOptions[] defaultCompressionOptions() {
87 List<CompressionOptions> compressionOptions = new ArrayList<CompressionOptions>();
88 compressionOptions.add(StandardCompressionOptions.gzip());
89 compressionOptions.add(StandardCompressionOptions.deflate());
90 compressionOptions.add(StandardCompressionOptions.snappy());
91 if (Brotli.isAvailable()) {
92 compressionOptions.add(StandardCompressionOptions.brotli());
93 }
94 if (Zstd.isAvailable()) {
95 compressionOptions.add(StandardCompressionOptions.zstd());
96 }
97 return compressionOptions.toArray(new CompressionOptions[0]);
98 }
99
100
101
102
103 @Deprecated
104 public CompressorHttp2ConnectionEncoder(Http2ConnectionEncoder delegate, int compressionLevel, int windowBits,
105 int memLevel) {
106 super(delegate);
107 this.compressionLevel = ObjectUtil.checkInRange(compressionLevel, 0, 9, "compressionLevel");
108 this.windowBits = ObjectUtil.checkInRange(windowBits, 9, 15, "windowBits");
109 this.memLevel = ObjectUtil.checkInRange(memLevel, 1, 9, "memLevel");
110
111 propertyKey = connection().newKey();
112 connection().addListener(new Http2ConnectionAdapter() {
113 @Override
114 public void onStreamRemoved(Http2Stream stream) {
115 final EmbeddedChannel compressor = stream.getProperty(propertyKey);
116 if (compressor != null) {
117 cleanup(stream, compressor);
118 }
119 }
120 });
121
122 supportsCompressionOptions = false;
123 }
124
125
126
127
128
129 public CompressorHttp2ConnectionEncoder(Http2ConnectionEncoder delegate,
130 CompressionOptions... compressionOptionsArgs) {
131 super(delegate);
132 ObjectUtil.checkNotNull(compressionOptionsArgs, "CompressionOptions");
133 ObjectUtil.deepCheckNotNull("CompressionOptions", compressionOptionsArgs);
134
135 for (CompressionOptions compressionOptions : compressionOptionsArgs) {
136
137
138
139
140
141
142 if (Brotli.isAvailable() && compressionOptions instanceof BrotliOptions) {
143 brotliOptions = (BrotliOptions) compressionOptions;
144 } else if (compressionOptions instanceof GzipOptions) {
145 gzipCompressionOptions = (GzipOptions) compressionOptions;
146 } else if (compressionOptions instanceof DeflateOptions) {
147 deflateOptions = (DeflateOptions) compressionOptions;
148 } else if (compressionOptions instanceof ZstdOptions) {
149 zstdOptions = (ZstdOptions) compressionOptions;
150 } else if (compressionOptions instanceof SnappyOptions) {
151 snappyOptions = (SnappyOptions) compressionOptions;
152 } else {
153 throw new IllegalArgumentException("Unsupported " + CompressionOptions.class.getSimpleName() +
154 ": " + compressionOptions);
155 }
156 }
157
158 supportsCompressionOptions = true;
159
160 propertyKey = connection().newKey();
161 connection().addListener(new Http2ConnectionAdapter() {
162 @Override
163 public void onStreamRemoved(Http2Stream stream) {
164 final EmbeddedChannel compressor = stream.getProperty(propertyKey);
165 if (compressor != null) {
166 cleanup(stream, compressor);
167 }
168 }
169 });
170 }
171
172 @Override
173 public ChannelFuture writeData(final ChannelHandlerContext ctx, final int streamId, ByteBuf data, int padding,
174 final boolean endOfStream, ChannelPromise promise) {
175 final Http2Stream stream = connection().stream(streamId);
176 final EmbeddedChannel channel = stream == null ? null : (EmbeddedChannel) stream.getProperty(propertyKey);
177 if (channel == null) {
178
179 return super.writeData(ctx, streamId, data, padding, endOfStream, promise);
180 }
181
182 try {
183
184 channel.writeOutbound(data);
185 ByteBuf buf = nextReadableBuf(channel);
186 if (buf == null) {
187 if (endOfStream) {
188 if (channel.finish()) {
189 buf = nextReadableBuf(channel);
190 }
191 return super.writeData(ctx, streamId, buf == null ? Unpooled.EMPTY_BUFFER : buf, padding,
192 true, promise);
193 }
194
195 promise.setSuccess();
196 return promise;
197 }
198
199 PromiseCombiner combiner = new PromiseCombiner(ctx.executor());
200 for (;;) {
201 ByteBuf nextBuf = nextReadableBuf(channel);
202 boolean compressedEndOfStream = nextBuf == null && endOfStream;
203 if (compressedEndOfStream && channel.finish()) {
204 nextBuf = nextReadableBuf(channel);
205 compressedEndOfStream = nextBuf == null;
206 }
207
208 ChannelPromise bufPromise = ctx.newPromise();
209 combiner.add(bufPromise);
210 super.writeData(ctx, streamId, buf, padding, compressedEndOfStream, bufPromise);
211 if (nextBuf == null) {
212 break;
213 }
214
215 padding = 0;
216 buf = nextBuf;
217 }
218 combiner.finish(promise);
219 } catch (Throwable cause) {
220 promise.tryFailure(cause);
221 } finally {
222 if (endOfStream) {
223 cleanup(stream, channel);
224 }
225 }
226 return promise;
227 }
228
229 @Override
230 public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding,
231 boolean endStream, ChannelPromise promise) {
232 try {
233
234 EmbeddedChannel compressor = newCompressor(ctx, headers, endStream);
235
236
237 ChannelFuture future = super.writeHeaders(ctx, streamId, headers, padding, endStream, promise);
238
239
240 bindCompressorToStream(compressor, streamId);
241
242 return future;
243 } catch (Throwable e) {
244 promise.tryFailure(e);
245 }
246 return promise;
247 }
248
249 @Override
250 public ChannelFuture writeHeaders(final ChannelHandlerContext ctx, final int streamId, final Http2Headers headers,
251 final int streamDependency, final short weight, final boolean exclusive, final int padding,
252 final boolean endOfStream, final ChannelPromise promise) {
253 try {
254
255 EmbeddedChannel compressor = newCompressor(ctx, headers, endOfStream);
256
257
258 ChannelFuture future = super.writeHeaders(ctx, streamId, headers, streamDependency, weight, exclusive,
259 padding, endOfStream, promise);
260
261
262 bindCompressorToStream(compressor, streamId);
263
264 return future;
265 } catch (Throwable e) {
266 promise.tryFailure(e);
267 }
268 return promise;
269 }
270
271
272
273
274
275
276
277
278
279
280
281 protected EmbeddedChannel newContentCompressor(ChannelHandlerContext ctx, CharSequence contentEncoding)
282 throws Http2Exception {
283 if (GZIP.contentEqualsIgnoreCase(contentEncoding) || X_GZIP.contentEqualsIgnoreCase(contentEncoding)) {
284 return newCompressionChannel(ctx, ZlibWrapper.GZIP);
285 }
286 if (DEFLATE.contentEqualsIgnoreCase(contentEncoding) || X_DEFLATE.contentEqualsIgnoreCase(contentEncoding)) {
287 return newCompressionChannel(ctx, ZlibWrapper.ZLIB);
288 }
289 if (Brotli.isAvailable() && brotliOptions != null && BR.contentEqualsIgnoreCase(contentEncoding)) {
290 return new EmbeddedChannel(ctx.channel().id(), ctx.channel().metadata().hasDisconnect(),
291 ctx.channel().config(), new BrotliEncoder(brotliOptions.parameters()));
292 }
293 if (zstdOptions != null && ZSTD.contentEqualsIgnoreCase(contentEncoding)) {
294 return new EmbeddedChannel(ctx.channel().id(), ctx.channel().metadata().hasDisconnect(),
295 ctx.channel().config(), new ZstdEncoder(zstdOptions.compressionLevel(),
296 zstdOptions.blockSize(), zstdOptions.maxEncodeSize()));
297 }
298 if (snappyOptions != null && SNAPPY.contentEqualsIgnoreCase(contentEncoding)) {
299 return new EmbeddedChannel(ctx.channel().id(), ctx.channel().metadata().hasDisconnect(),
300 ctx.channel().config(), new SnappyFrameEncoder());
301 }
302
303 return null;
304 }
305
306
307
308
309
310
311
312
313
314 protected CharSequence getTargetContentEncoding(CharSequence contentEncoding) throws Http2Exception {
315 return contentEncoding;
316 }
317
318
319
320
321
322
323 private EmbeddedChannel newCompressionChannel(final ChannelHandlerContext ctx, ZlibWrapper wrapper) {
324 if (supportsCompressionOptions) {
325 if (wrapper == ZlibWrapper.GZIP && gzipCompressionOptions != null) {
326 return new EmbeddedChannel(ctx.channel().id(), ctx.channel().metadata().hasDisconnect(),
327 ctx.channel().config(), ZlibCodecFactory.newZlibEncoder(wrapper,
328 gzipCompressionOptions.compressionLevel(), gzipCompressionOptions.windowBits(),
329 gzipCompressionOptions.memLevel()));
330 } else if (wrapper == ZlibWrapper.ZLIB && deflateOptions != null) {
331 return new EmbeddedChannel(ctx.channel().id(), ctx.channel().metadata().hasDisconnect(),
332 ctx.channel().config(), ZlibCodecFactory.newZlibEncoder(wrapper,
333 deflateOptions.compressionLevel(), deflateOptions.windowBits(),
334 deflateOptions.memLevel()));
335 } else {
336 throw new IllegalArgumentException("Unsupported ZlibWrapper: " + wrapper);
337 }
338 } else {
339 return new EmbeddedChannel(ctx.channel().id(), ctx.channel().metadata().hasDisconnect(),
340 ctx.channel().config(), ZlibCodecFactory.newZlibEncoder(wrapper, compressionLevel, windowBits,
341 memLevel));
342 }
343 }
344
345
346
347
348
349
350
351
352
353
354
355 private EmbeddedChannel newCompressor(ChannelHandlerContext ctx, Http2Headers headers, boolean endOfStream)
356 throws Http2Exception {
357 if (endOfStream) {
358 return null;
359 }
360
361 CharSequence encoding = headers.get(CONTENT_ENCODING);
362 if (encoding == null) {
363 encoding = IDENTITY;
364 }
365 final EmbeddedChannel compressor = newContentCompressor(ctx, encoding);
366 if (compressor != null) {
367 CharSequence targetContentEncoding = getTargetContentEncoding(encoding);
368 if (IDENTITY.contentEqualsIgnoreCase(targetContentEncoding)) {
369 headers.remove(CONTENT_ENCODING);
370 } else {
371 headers.set(CONTENT_ENCODING, targetContentEncoding);
372 }
373
374
375
376
377 headers.remove(CONTENT_LENGTH);
378 }
379
380 return compressor;
381 }
382
383
384
385
386
387
388 private void bindCompressorToStream(EmbeddedChannel compressor, int streamId) {
389 if (compressor != null) {
390 Http2Stream stream = connection().stream(streamId);
391 if (stream != null) {
392 stream.setProperty(propertyKey, compressor);
393 }
394 }
395 }
396
397
398
399
400
401
402
403 void cleanup(Http2Stream stream, EmbeddedChannel compressor) {
404 compressor.finishAndReleaseAll();
405 stream.removeProperty(propertyKey);
406 }
407
408
409
410
411
412
413
414 private static ByteBuf nextReadableBuf(EmbeddedChannel compressor) {
415 for (;;) {
416 final ByteBuf buf = compressor.readOutbound();
417 if (buf == null) {
418 return null;
419 }
420 if (!buf.isReadable()) {
421 buf.release();
422 continue;
423 }
424 return buf;
425 }
426 }
427 }