1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package io.netty.handler.codec.http2;
16
17 import io.netty.buffer.ByteBuf;
18 import io.netty.buffer.Unpooled;
19 import io.netty.channel.ChannelFuture;
20 import io.netty.channel.ChannelHandlerContext;
21 import io.netty.channel.ChannelPromise;
22 import io.netty.channel.embedded.EmbeddedChannel;
23 import io.netty.handler.codec.ByteToMessageDecoder;
24 import io.netty.handler.codec.compression.BrotliEncoder;
25 import io.netty.handler.codec.compression.ZlibCodecFactory;
26 import io.netty.handler.codec.compression.ZlibWrapper;
27 import io.netty.handler.codec.compression.Brotli;
28 import io.netty.handler.codec.compression.BrotliOptions;
29 import io.netty.handler.codec.compression.CompressionOptions;
30 import io.netty.handler.codec.compression.DeflateOptions;
31 import io.netty.handler.codec.compression.GzipOptions;
32 import io.netty.handler.codec.compression.StandardCompressionOptions;
33 import io.netty.handler.codec.compression.ZstdEncoder;
34 import io.netty.handler.codec.compression.ZstdOptions;
35 import io.netty.handler.codec.compression.SnappyFrameEncoder;
36 import io.netty.handler.codec.compression.SnappyOptions;
37 import io.netty.util.concurrent.PromiseCombiner;
38 import io.netty.util.internal.ObjectUtil;
39 import io.netty.util.internal.UnstableApi;
40
41 import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_ENCODING;
42 import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_LENGTH;
43 import static io.netty.handler.codec.http.HttpHeaderValues.BR;
44 import static io.netty.handler.codec.http.HttpHeaderValues.DEFLATE;
45 import static io.netty.handler.codec.http.HttpHeaderValues.GZIP;
46 import static io.netty.handler.codec.http.HttpHeaderValues.IDENTITY;
47 import static io.netty.handler.codec.http.HttpHeaderValues.X_DEFLATE;
48 import static io.netty.handler.codec.http.HttpHeaderValues.X_GZIP;
49 import static io.netty.handler.codec.http.HttpHeaderValues.ZSTD;
50 import static io.netty.handler.codec.http.HttpHeaderValues.SNAPPY;
51
52
53
54
55
56 @UnstableApi
57 public class CompressorHttp2ConnectionEncoder extends DecoratingHttp2ConnectionEncoder {
58
59 public static final int DEFAULT_COMPRESSION_LEVEL = 6;
60 public static final int DEFAULT_WINDOW_BITS = 15;
61 public static final int DEFAULT_MEM_LEVEL = 8;
62
63 private int compressionLevel;
64 private int windowBits;
65 private int memLevel;
66 private final Http2Connection.PropertyKey propertyKey;
67
68 private final boolean supportsCompressionOptions;
69
70 private BrotliOptions brotliOptions;
71 private GzipOptions gzipCompressionOptions;
72 private DeflateOptions deflateOptions;
73 private ZstdOptions zstdOptions;
74 private SnappyOptions snappyOptions;
75
76
77
78
79
80 public CompressorHttp2ConnectionEncoder(Http2ConnectionEncoder delegate) {
81 this(delegate, defaultCompressionOptions());
82 }
83
84 private static CompressionOptions[] defaultCompressionOptions() {
85 if (Brotli.isAvailable()) {
86 return new CompressionOptions[] {
87 StandardCompressionOptions.brotli(),
88 StandardCompressionOptions.snappy(),
89 StandardCompressionOptions.gzip(),
90 StandardCompressionOptions.deflate() };
91 }
92 return new CompressionOptions[] { StandardCompressionOptions.snappy(),
93 StandardCompressionOptions.gzip(), StandardCompressionOptions.deflate() };
94 }
95
96
97
98
99 @Deprecated
100 public CompressorHttp2ConnectionEncoder(Http2ConnectionEncoder delegate, int compressionLevel, int windowBits,
101 int memLevel) {
102 super(delegate);
103 this.compressionLevel = ObjectUtil.checkInRange(compressionLevel, 0, 9, "compressionLevel");
104 this.windowBits = ObjectUtil.checkInRange(windowBits, 9, 15, "windowBits");
105 this.memLevel = ObjectUtil.checkInRange(memLevel, 1, 9, "memLevel");
106
107 propertyKey = connection().newKey();
108 connection().addListener(new Http2ConnectionAdapter() {
109 @Override
110 public void onStreamRemoved(Http2Stream stream) {
111 final EmbeddedChannel compressor = stream.getProperty(propertyKey);
112 if (compressor != null) {
113 cleanup(stream, compressor);
114 }
115 }
116 });
117
118 supportsCompressionOptions = false;
119 }
120
121
122
123
124
125 public CompressorHttp2ConnectionEncoder(Http2ConnectionEncoder delegate,
126 CompressionOptions... compressionOptionsArgs) {
127 super(delegate);
128 ObjectUtil.checkNotNull(compressionOptionsArgs, "CompressionOptions");
129 ObjectUtil.deepCheckNotNull("CompressionOptions", compressionOptionsArgs);
130
131 for (CompressionOptions compressionOptions : compressionOptionsArgs) {
132
133
134
135
136
137
138 if (Brotli.isAvailable() && compressionOptions instanceof BrotliOptions) {
139 brotliOptions = (BrotliOptions) compressionOptions;
140 } else if (compressionOptions instanceof GzipOptions) {
141 gzipCompressionOptions = (GzipOptions) compressionOptions;
142 } else if (compressionOptions instanceof DeflateOptions) {
143 deflateOptions = (DeflateOptions) compressionOptions;
144 } else if (compressionOptions instanceof ZstdOptions) {
145 zstdOptions = (ZstdOptions) compressionOptions;
146 } else if (compressionOptions instanceof SnappyOptions) {
147 snappyOptions = (SnappyOptions) compressionOptions;
148 } else {
149 throw new IllegalArgumentException("Unsupported " + CompressionOptions.class.getSimpleName() +
150 ": " + compressionOptions);
151 }
152 }
153
154 supportsCompressionOptions = true;
155
156 propertyKey = connection().newKey();
157 connection().addListener(new Http2ConnectionAdapter() {
158 @Override
159 public void onStreamRemoved(Http2Stream stream) {
160 final EmbeddedChannel compressor = stream.getProperty(propertyKey);
161 if (compressor != null) {
162 cleanup(stream, compressor);
163 }
164 }
165 });
166 }
167
168 @Override
169 public ChannelFuture writeData(final ChannelHandlerContext ctx, final int streamId, ByteBuf data, int padding,
170 final boolean endOfStream, ChannelPromise promise) {
171 final Http2Stream stream = connection().stream(streamId);
172 final EmbeddedChannel channel = stream == null ? null : (EmbeddedChannel) stream.getProperty(propertyKey);
173 if (channel == null) {
174
175 return super.writeData(ctx, streamId, data, padding, endOfStream, promise);
176 }
177
178 try {
179
180 channel.writeOutbound(data);
181 ByteBuf buf = nextReadableBuf(channel);
182 if (buf == null) {
183 if (endOfStream) {
184 if (channel.finish()) {
185 buf = nextReadableBuf(channel);
186 }
187 return super.writeData(ctx, streamId, buf == null ? Unpooled.EMPTY_BUFFER : buf, padding,
188 true, promise);
189 }
190
191 promise.setSuccess();
192 return promise;
193 }
194
195 PromiseCombiner combiner = new PromiseCombiner(ctx.executor());
196 for (;;) {
197 ByteBuf nextBuf = nextReadableBuf(channel);
198 boolean compressedEndOfStream = nextBuf == null && endOfStream;
199 if (compressedEndOfStream && channel.finish()) {
200 nextBuf = nextReadableBuf(channel);
201 compressedEndOfStream = nextBuf == null;
202 }
203
204 ChannelPromise bufPromise = ctx.newPromise();
205 combiner.add(bufPromise);
206 super.writeData(ctx, streamId, buf, padding, compressedEndOfStream, bufPromise);
207 if (nextBuf == null) {
208 break;
209 }
210
211 padding = 0;
212 buf = nextBuf;
213 }
214 combiner.finish(promise);
215 } catch (Throwable cause) {
216 promise.tryFailure(cause);
217 } finally {
218 if (endOfStream) {
219 cleanup(stream, channel);
220 }
221 }
222 return promise;
223 }
224
225 @Override
226 public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding,
227 boolean endStream, ChannelPromise promise) {
228 try {
229
230 EmbeddedChannel compressor = newCompressor(ctx, headers, endStream);
231
232
233 ChannelFuture future = super.writeHeaders(ctx, streamId, headers, padding, endStream, promise);
234
235
236 bindCompressorToStream(compressor, streamId);
237
238 return future;
239 } catch (Throwable e) {
240 promise.tryFailure(e);
241 }
242 return promise;
243 }
244
245 @Override
246 public ChannelFuture writeHeaders(final ChannelHandlerContext ctx, final int streamId, final Http2Headers headers,
247 final int streamDependency, final short weight, final boolean exclusive, final int padding,
248 final boolean endOfStream, final ChannelPromise promise) {
249 try {
250
251 EmbeddedChannel compressor = newCompressor(ctx, headers, endOfStream);
252
253
254 ChannelFuture future = super.writeHeaders(ctx, streamId, headers, streamDependency, weight, exclusive,
255 padding, endOfStream, promise);
256
257
258 bindCompressorToStream(compressor, streamId);
259
260 return future;
261 } catch (Throwable e) {
262 promise.tryFailure(e);
263 }
264 return promise;
265 }
266
267
268
269
270
271
272
273
274
275
276
277 protected EmbeddedChannel newContentCompressor(ChannelHandlerContext ctx, CharSequence contentEncoding)
278 throws Http2Exception {
279 if (GZIP.contentEqualsIgnoreCase(contentEncoding) || X_GZIP.contentEqualsIgnoreCase(contentEncoding)) {
280 return newCompressionChannel(ctx, ZlibWrapper.GZIP);
281 }
282 if (DEFLATE.contentEqualsIgnoreCase(contentEncoding) || X_DEFLATE.contentEqualsIgnoreCase(contentEncoding)) {
283 return newCompressionChannel(ctx, ZlibWrapper.ZLIB);
284 }
285 if (Brotli.isAvailable() && brotliOptions != null && BR.contentEqualsIgnoreCase(contentEncoding)) {
286 return new EmbeddedChannel(ctx.channel().id(), ctx.channel().metadata().hasDisconnect(),
287 ctx.channel().config(), new BrotliEncoder(brotliOptions.parameters()));
288 }
289 if (zstdOptions != null && ZSTD.contentEqualsIgnoreCase(contentEncoding)) {
290 return new EmbeddedChannel(ctx.channel().id(), ctx.channel().metadata().hasDisconnect(),
291 ctx.channel().config(), new ZstdEncoder(zstdOptions.compressionLevel(),
292 zstdOptions.blockSize(), zstdOptions.maxEncodeSize()));
293 }
294 if (snappyOptions != null && SNAPPY.contentEqualsIgnoreCase(contentEncoding)) {
295 return new EmbeddedChannel(ctx.channel().id(), ctx.channel().metadata().hasDisconnect(),
296 ctx.channel().config(), new SnappyFrameEncoder());
297 }
298
299 return null;
300 }
301
302
303
304
305
306
307
308
309
310 protected CharSequence getTargetContentEncoding(CharSequence contentEncoding) throws Http2Exception {
311 return contentEncoding;
312 }
313
314
315
316
317
318
319 private EmbeddedChannel newCompressionChannel(final ChannelHandlerContext ctx, ZlibWrapper wrapper) {
320 if (supportsCompressionOptions) {
321 if (wrapper == ZlibWrapper.GZIP && gzipCompressionOptions != null) {
322 return new EmbeddedChannel(ctx.channel().id(), ctx.channel().metadata().hasDisconnect(),
323 ctx.channel().config(), ZlibCodecFactory.newZlibEncoder(wrapper,
324 gzipCompressionOptions.compressionLevel(), gzipCompressionOptions.windowBits(),
325 gzipCompressionOptions.memLevel()));
326 } else if (wrapper == ZlibWrapper.ZLIB && deflateOptions != null) {
327 return new EmbeddedChannel(ctx.channel().id(), ctx.channel().metadata().hasDisconnect(),
328 ctx.channel().config(), ZlibCodecFactory.newZlibEncoder(wrapper,
329 deflateOptions.compressionLevel(), deflateOptions.windowBits(),
330 deflateOptions.memLevel()));
331 } else {
332 throw new IllegalArgumentException("Unsupported ZlibWrapper: " + wrapper);
333 }
334 } else {
335 return new EmbeddedChannel(ctx.channel().id(), ctx.channel().metadata().hasDisconnect(),
336 ctx.channel().config(), ZlibCodecFactory.newZlibEncoder(wrapper, compressionLevel, windowBits,
337 memLevel));
338 }
339 }
340
341
342
343
344
345
346
347
348
349
350
351 private EmbeddedChannel newCompressor(ChannelHandlerContext ctx, Http2Headers headers, boolean endOfStream)
352 throws Http2Exception {
353 if (endOfStream) {
354 return null;
355 }
356
357 CharSequence encoding = headers.get(CONTENT_ENCODING);
358 if (encoding == null) {
359 encoding = IDENTITY;
360 }
361 final EmbeddedChannel compressor = newContentCompressor(ctx, encoding);
362 if (compressor != null) {
363 CharSequence targetContentEncoding = getTargetContentEncoding(encoding);
364 if (IDENTITY.contentEqualsIgnoreCase(targetContentEncoding)) {
365 headers.remove(CONTENT_ENCODING);
366 } else {
367 headers.set(CONTENT_ENCODING, targetContentEncoding);
368 }
369
370
371
372
373 headers.remove(CONTENT_LENGTH);
374 }
375
376 return compressor;
377 }
378
379
380
381
382
383
384 private void bindCompressorToStream(EmbeddedChannel compressor, int streamId) {
385 if (compressor != null) {
386 Http2Stream stream = connection().stream(streamId);
387 if (stream != null) {
388 stream.setProperty(propertyKey, compressor);
389 }
390 }
391 }
392
393
394
395
396
397
398
399 void cleanup(Http2Stream stream, EmbeddedChannel compressor) {
400 compressor.finishAndReleaseAll();
401 stream.removeProperty(propertyKey);
402 }
403
404
405
406
407
408
409
410 private static ByteBuf nextReadableBuf(EmbeddedChannel compressor) {
411 for (;;) {
412 final ByteBuf buf = compressor.readOutbound();
413 if (buf == null) {
414 return null;
415 }
416 if (!buf.isReadable()) {
417 buf.release();
418 continue;
419 }
420 return buf;
421 }
422 }
423 }