1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package io.netty.handler.codec.http2;
16
17 import io.netty.buffer.ByteBuf;
18 import io.netty.buffer.Unpooled;
19 import io.netty.channel.Channel;
20 import io.netty.channel.ChannelConfig;
21 import io.netty.channel.ChannelFuture;
22 import io.netty.channel.ChannelHandler;
23 import io.netty.channel.ChannelHandlerContext;
24 import io.netty.channel.ChannelId;
25 import io.netty.channel.ChannelPromise;
26 import io.netty.channel.embedded.EmbeddedChannel;
27 import io.netty.handler.codec.ByteToMessageDecoder;
28 import io.netty.handler.codec.compression.BrotliEncoder;
29 import io.netty.handler.codec.compression.ZlibCodecFactory;
30 import io.netty.handler.codec.compression.ZlibWrapper;
31 import io.netty.handler.codec.compression.Brotli;
32 import io.netty.handler.codec.compression.BrotliOptions;
33 import io.netty.handler.codec.compression.CompressionOptions;
34 import io.netty.handler.codec.compression.DeflateOptions;
35 import io.netty.handler.codec.compression.GzipOptions;
36 import io.netty.handler.codec.compression.StandardCompressionOptions;
37 import io.netty.handler.codec.compression.Zstd;
38 import io.netty.handler.codec.compression.ZstdEncoder;
39 import io.netty.handler.codec.compression.ZstdOptions;
40 import io.netty.handler.codec.compression.SnappyFrameEncoder;
41 import io.netty.handler.codec.compression.SnappyOptions;
42 import io.netty.util.concurrent.PromiseCombiner;
43 import io.netty.util.internal.ObjectUtil;
44
45 import java.util.ArrayList;
46 import java.util.List;
47
48 import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_ENCODING;
49 import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_LENGTH;
50 import static io.netty.handler.codec.http.HttpHeaderValues.BR;
51 import static io.netty.handler.codec.http.HttpHeaderValues.DEFLATE;
52 import static io.netty.handler.codec.http.HttpHeaderValues.GZIP;
53 import static io.netty.handler.codec.http.HttpHeaderValues.IDENTITY;
54 import static io.netty.handler.codec.http.HttpHeaderValues.X_DEFLATE;
55 import static io.netty.handler.codec.http.HttpHeaderValues.X_GZIP;
56 import static io.netty.handler.codec.http.HttpHeaderValues.ZSTD;
57 import static io.netty.handler.codec.http.HttpHeaderValues.SNAPPY;
58
59
60
61
62
63 public class CompressorHttp2ConnectionEncoder extends DecoratingHttp2ConnectionEncoder {
64
65 public static final int DEFAULT_COMPRESSION_LEVEL = 6;
66 public static final int DEFAULT_WINDOW_BITS = 15;
67 public static final int DEFAULT_MEM_LEVEL = 8;
68
69 private int compressionLevel;
70 private int windowBits;
71 private int memLevel;
72 private final Http2Connection.PropertyKey propertyKey;
73
74 private final boolean supportsCompressionOptions;
75
76 private BrotliOptions brotliOptions;
77 private GzipOptions gzipCompressionOptions;
78 private DeflateOptions deflateOptions;
79 private ZstdOptions zstdOptions;
80 private SnappyOptions snappyOptions;
81
82
83
84
85
86 public CompressorHttp2ConnectionEncoder(Http2ConnectionEncoder delegate) {
87 this(delegate, defaultCompressionOptions());
88 }
89
90 private static CompressionOptions[] defaultCompressionOptions() {
91 List<CompressionOptions> compressionOptions = new ArrayList<CompressionOptions>();
92 compressionOptions.add(StandardCompressionOptions.gzip());
93 compressionOptions.add(StandardCompressionOptions.deflate());
94 compressionOptions.add(StandardCompressionOptions.snappy());
95 if (Brotli.isAvailable()) {
96 compressionOptions.add(StandardCompressionOptions.brotli());
97 }
98 if (Zstd.isAvailable()) {
99 compressionOptions.add(StandardCompressionOptions.zstd());
100 }
101 return compressionOptions.toArray(new CompressionOptions[0]);
102 }
103
104
105
106
107 @Deprecated
108 public CompressorHttp2ConnectionEncoder(Http2ConnectionEncoder delegate, int compressionLevel, int windowBits,
109 int memLevel) {
110 super(delegate);
111 this.compressionLevel = ObjectUtil.checkInRange(compressionLevel, 0, 9, "compressionLevel");
112 this.windowBits = ObjectUtil.checkInRange(windowBits, 9, 15, "windowBits");
113 this.memLevel = ObjectUtil.checkInRange(memLevel, 1, 9, "memLevel");
114
115 propertyKey = connection().newKey();
116 connection().addListener(new Http2ConnectionAdapter() {
117 @Override
118 public void onStreamRemoved(Http2Stream stream) {
119 final EmbeddedChannel compressor = stream.getProperty(propertyKey);
120 if (compressor != null) {
121 cleanup(stream, compressor);
122 }
123 }
124 });
125
126 supportsCompressionOptions = false;
127 }
128
129
130
131
132
133 public CompressorHttp2ConnectionEncoder(Http2ConnectionEncoder delegate,
134 CompressionOptions... compressionOptionsArgs) {
135 super(delegate);
136 ObjectUtil.checkNotNull(compressionOptionsArgs, "CompressionOptions");
137 ObjectUtil.deepCheckNotNull("CompressionOptions", compressionOptionsArgs);
138
139 for (CompressionOptions compressionOptions : compressionOptionsArgs) {
140
141
142
143
144
145
146 if (Brotli.isAvailable() && compressionOptions instanceof BrotliOptions) {
147 brotliOptions = (BrotliOptions) compressionOptions;
148 } else if (compressionOptions instanceof GzipOptions) {
149 gzipCompressionOptions = (GzipOptions) compressionOptions;
150 } else if (compressionOptions instanceof DeflateOptions) {
151 deflateOptions = (DeflateOptions) compressionOptions;
152 } else if (compressionOptions instanceof ZstdOptions) {
153 zstdOptions = (ZstdOptions) compressionOptions;
154 } else if (compressionOptions instanceof SnappyOptions) {
155 snappyOptions = (SnappyOptions) compressionOptions;
156 } else {
157 throw new IllegalArgumentException("Unsupported " + CompressionOptions.class.getSimpleName() +
158 ": " + compressionOptions);
159 }
160 }
161
162 supportsCompressionOptions = true;
163
164 propertyKey = connection().newKey();
165 connection().addListener(new Http2ConnectionAdapter() {
166 @Override
167 public void onStreamRemoved(Http2Stream stream) {
168 final EmbeddedChannel compressor = stream.getProperty(propertyKey);
169 if (compressor != null) {
170 cleanup(stream, compressor);
171 }
172 }
173 });
174 }
175
176 @Override
177 public ChannelFuture writeData(final ChannelHandlerContext ctx, final int streamId, ByteBuf data, int padding,
178 final boolean endOfStream, ChannelPromise promise) {
179 final Http2Stream stream = connection().stream(streamId);
180 final EmbeddedChannel channel = stream == null ? null : (EmbeddedChannel) stream.getProperty(propertyKey);
181 if (channel == null) {
182
183 return super.writeData(ctx, streamId, data, padding, endOfStream, promise);
184 }
185
186 try {
187
188 channel.writeOutbound(data);
189 ByteBuf buf = nextReadableBuf(channel);
190 if (buf == null) {
191 if (endOfStream) {
192 if (channel.finish()) {
193 buf = nextReadableBuf(channel);
194 }
195 return super.writeData(ctx, streamId, buf == null ? Unpooled.EMPTY_BUFFER : buf, padding,
196 true, promise);
197 }
198
199 promise.setSuccess();
200 return promise;
201 }
202
203 PromiseCombiner combiner = new PromiseCombiner(ctx.executor());
204 for (;;) {
205 ByteBuf nextBuf = nextReadableBuf(channel);
206 boolean compressedEndOfStream = nextBuf == null && endOfStream;
207 if (compressedEndOfStream && channel.finish()) {
208 nextBuf = nextReadableBuf(channel);
209 compressedEndOfStream = nextBuf == null;
210 }
211
212 ChannelPromise bufPromise = ctx.newPromise();
213 combiner.add(bufPromise);
214 super.writeData(ctx, streamId, buf, padding, compressedEndOfStream, bufPromise);
215 if (nextBuf == null) {
216 break;
217 }
218
219 padding = 0;
220 buf = nextBuf;
221 }
222 combiner.finish(promise);
223 } catch (Throwable cause) {
224 promise.tryFailure(cause);
225 } finally {
226 if (endOfStream) {
227 cleanup(stream, channel);
228 }
229 }
230 return promise;
231 }
232
233 @Override
234 public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding,
235 boolean endStream, ChannelPromise promise) {
236 try {
237
238 EmbeddedChannel compressor = newCompressor(ctx, headers, endStream);
239
240
241 ChannelFuture future = super.writeHeaders(ctx, streamId, headers, padding, endStream, promise);
242
243
244 bindCompressorToStream(compressor, streamId);
245
246 return future;
247 } catch (Throwable e) {
248 promise.tryFailure(e);
249 }
250 return promise;
251 }
252
253 @Override
254 public ChannelFuture writeHeaders(final ChannelHandlerContext ctx, final int streamId, final Http2Headers headers,
255 final int streamDependency, final short weight, final boolean exclusive, final int padding,
256 final boolean endOfStream, final ChannelPromise promise) {
257 try {
258
259 EmbeddedChannel compressor = newCompressor(ctx, headers, endOfStream);
260
261
262 ChannelFuture future = super.writeHeaders(ctx, streamId, headers, streamDependency, weight, exclusive,
263 padding, endOfStream, promise);
264
265
266 bindCompressorToStream(compressor, streamId);
267
268 return future;
269 } catch (Throwable e) {
270 promise.tryFailure(e);
271 }
272 return promise;
273 }
274
275
276
277
278
279
280
281
282
283
284
285 protected EmbeddedChannel newContentCompressor(ChannelHandlerContext ctx, CharSequence contentEncoding)
286 throws Http2Exception {
287 if (GZIP.contentEqualsIgnoreCase(contentEncoding) || X_GZIP.contentEqualsIgnoreCase(contentEncoding)) {
288 return newCompressionChannel(ctx, ZlibWrapper.GZIP);
289 }
290 if (DEFLATE.contentEqualsIgnoreCase(contentEncoding) || X_DEFLATE.contentEqualsIgnoreCase(contentEncoding)) {
291 return newCompressionChannel(ctx, ZlibWrapper.ZLIB);
292 }
293 Channel channel = ctx.channel();
294 if (Brotli.isAvailable() && brotliOptions != null && BR.contentEqualsIgnoreCase(contentEncoding)) {
295 return EmbeddedChannel.builder()
296 .channelId(channel.id())
297 .hasDisconnect(channel.metadata().hasDisconnect())
298 .config(channel.config())
299 .handlers(new BrotliEncoder(brotliOptions.parameters()))
300 .build();
301 }
302 if (zstdOptions != null && ZSTD.contentEqualsIgnoreCase(contentEncoding)) {
303 return EmbeddedChannel.builder()
304 .channelId(channel.id())
305 .hasDisconnect(channel.metadata().hasDisconnect())
306 .config(channel.config())
307 .handlers(new ZstdEncoder(zstdOptions.compressionLevel(),
308 zstdOptions.blockSize(), zstdOptions.maxEncodeSize()))
309 .build();
310 }
311 if (snappyOptions != null && SNAPPY.contentEqualsIgnoreCase(contentEncoding)) {
312 return EmbeddedChannel.builder()
313 .channelId(channel.id())
314 .hasDisconnect(channel.metadata().hasDisconnect())
315 .config(channel.config())
316 .handlers(new SnappyFrameEncoder())
317 .build();
318 }
319
320 return null;
321 }
322
323
324
325
326
327
328
329
330
331 protected CharSequence getTargetContentEncoding(CharSequence contentEncoding) throws Http2Exception {
332 return contentEncoding;
333 }
334
335
336
337
338
339
340 private EmbeddedChannel newCompressionChannel(final ChannelHandlerContext ctx, ZlibWrapper wrapper) {
341 Channel channel = ctx.channel();
342 if (supportsCompressionOptions) {
343 if (wrapper == ZlibWrapper.GZIP && gzipCompressionOptions != null) {
344 return EmbeddedChannel.builder()
345 .channelId(channel.id())
346 .hasDisconnect(channel.metadata().hasDisconnect())
347 .config(channel.config())
348 .handlers(ZlibCodecFactory.newZlibEncoder(wrapper,
349 gzipCompressionOptions.compressionLevel(),
350 gzipCompressionOptions.windowBits(),
351 gzipCompressionOptions.memLevel())
352 )
353 .build();
354 } else if (wrapper == ZlibWrapper.ZLIB && deflateOptions != null) {
355 return EmbeddedChannel.builder()
356 .channelId(channel.id())
357 .hasDisconnect(channel.metadata().hasDisconnect())
358 .config(channel.config())
359 .handlers(ZlibCodecFactory.newZlibEncoder(wrapper,
360 deflateOptions.compressionLevel(),
361 deflateOptions.windowBits(),
362 deflateOptions.memLevel())
363 )
364 .build();
365 } else {
366 throw new IllegalArgumentException("Unsupported ZlibWrapper: " + wrapper);
367 }
368 } else {
369 return EmbeddedChannel.builder()
370 .channelId(channel.id())
371 .hasDisconnect(channel.metadata().hasDisconnect())
372 .config(channel.config())
373 .handlers(ZlibCodecFactory.newZlibEncoder(wrapper, compressionLevel, windowBits, memLevel))
374 .build();
375 }
376 }
377
378
379
380
381
382
383
384
385
386
387
388 private EmbeddedChannel newCompressor(ChannelHandlerContext ctx, Http2Headers headers, boolean endOfStream)
389 throws Http2Exception {
390 if (endOfStream) {
391 return null;
392 }
393
394 CharSequence encoding = headers.get(CONTENT_ENCODING);
395 if (encoding == null) {
396 encoding = IDENTITY;
397 }
398 final EmbeddedChannel compressor = newContentCompressor(ctx, encoding);
399 if (compressor != null) {
400 CharSequence targetContentEncoding = getTargetContentEncoding(encoding);
401 if (IDENTITY.contentEqualsIgnoreCase(targetContentEncoding)) {
402 headers.remove(CONTENT_ENCODING);
403 } else {
404 headers.set(CONTENT_ENCODING, targetContentEncoding);
405 }
406
407
408
409
410 headers.remove(CONTENT_LENGTH);
411 }
412
413 return compressor;
414 }
415
416
417
418
419
420
421 private void bindCompressorToStream(EmbeddedChannel compressor, int streamId) {
422 if (compressor != null) {
423 Http2Stream stream = connection().stream(streamId);
424 if (stream != null) {
425 stream.setProperty(propertyKey, compressor);
426 }
427 }
428 }
429
430
431
432
433
434
435
436 void cleanup(Http2Stream stream, EmbeddedChannel compressor) {
437 compressor.finishAndReleaseAll();
438 stream.removeProperty(propertyKey);
439 }
440
441
442
443
444
445
446
447 private static ByteBuf nextReadableBuf(EmbeddedChannel compressor) {
448 for (;;) {
449 final ByteBuf buf = compressor.readOutbound();
450 if (buf == null) {
451 return null;
452 }
453 if (!buf.isReadable()) {
454 buf.release();
455 continue;
456 }
457 return buf;
458 }
459 }
460 }