1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package io.netty.handler.codec.http2;
16
17 import io.netty.buffer.ByteBuf;
18 import io.netty.buffer.Unpooled;
19 import io.netty.channel.ChannelFuture;
20 import io.netty.channel.ChannelHandlerContext;
21 import io.netty.channel.ChannelPromise;
22 import io.netty.channel.embedded.EmbeddedChannel;
23 import io.netty.handler.codec.ByteToMessageDecoder;
24 import io.netty.handler.codec.compression.BrotliEncoder;
25 import io.netty.handler.codec.compression.ZlibCodecFactory;
26 import io.netty.handler.codec.compression.ZlibWrapper;
27 import io.netty.handler.codec.compression.Brotli;
28 import io.netty.handler.codec.compression.BrotliOptions;
29 import io.netty.handler.codec.compression.CompressionOptions;
30 import io.netty.handler.codec.compression.DeflateOptions;
31 import io.netty.handler.codec.compression.GzipOptions;
32 import io.netty.handler.codec.compression.StandardCompressionOptions;
33 import io.netty.handler.codec.compression.ZstdEncoder;
34 import io.netty.handler.codec.compression.ZstdOptions;
35 import io.netty.util.concurrent.PromiseCombiner;
36 import io.netty.util.internal.ObjectUtil;
37 import io.netty.util.internal.UnstableApi;
38
39 import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_ENCODING;
40 import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_LENGTH;
41 import static io.netty.handler.codec.http.HttpHeaderValues.BR;
42 import static io.netty.handler.codec.http.HttpHeaderValues.DEFLATE;
43 import static io.netty.handler.codec.http.HttpHeaderValues.GZIP;
44 import static io.netty.handler.codec.http.HttpHeaderValues.IDENTITY;
45 import static io.netty.handler.codec.http.HttpHeaderValues.X_DEFLATE;
46 import static io.netty.handler.codec.http.HttpHeaderValues.X_GZIP;
47 import static io.netty.handler.codec.http.HttpHeaderValues.ZSTD;
48
49
50
51
52
53 @UnstableApi
54 public class CompressorHttp2ConnectionEncoder extends DecoratingHttp2ConnectionEncoder {
55
56 public static final int DEFAULT_COMPRESSION_LEVEL = 6;
57 public static final int DEFAULT_WINDOW_BITS = 15;
58 public static final int DEFAULT_MEM_LEVEL = 8;
59
60 private int compressionLevel;
61 private int windowBits;
62 private int memLevel;
63 private final Http2Connection.PropertyKey propertyKey;
64
65 private final boolean supportsCompressionOptions;
66
67 private BrotliOptions brotliOptions;
68 private GzipOptions gzipCompressionOptions;
69 private DeflateOptions deflateOptions;
70 private ZstdOptions zstdOptions;
71
72
73
74
75
76 public CompressorHttp2ConnectionEncoder(Http2ConnectionEncoder delegate) {
77 this(delegate, defaultCompressionOptions());
78 }
79
80 private static CompressionOptions[] defaultCompressionOptions() {
81 if (Brotli.isAvailable()) {
82 return new CompressionOptions[] {
83 StandardCompressionOptions.brotli(),
84 StandardCompressionOptions.gzip(),
85 StandardCompressionOptions.deflate() };
86 }
87 return new CompressionOptions[] { StandardCompressionOptions.gzip(), StandardCompressionOptions.deflate() };
88 }
89
90
91
92
93 @Deprecated
94 public CompressorHttp2ConnectionEncoder(Http2ConnectionEncoder delegate, int compressionLevel, int windowBits,
95 int memLevel) {
96 super(delegate);
97 this.compressionLevel = ObjectUtil.checkInRange(compressionLevel, 0, 9, "compressionLevel");
98 this.windowBits = ObjectUtil.checkInRange(windowBits, 9, 15, "windowBits");
99 this.memLevel = ObjectUtil.checkInRange(memLevel, 1, 9, "memLevel");
100
101 propertyKey = connection().newKey();
102 connection().addListener(new Http2ConnectionAdapter() {
103 @Override
104 public void onStreamRemoved(Http2Stream stream) {
105 final EmbeddedChannel compressor = stream.getProperty(propertyKey);
106 if (compressor != null) {
107 cleanup(stream, compressor);
108 }
109 }
110 });
111
112 supportsCompressionOptions = false;
113 }
114
115
116
117
118
119 public CompressorHttp2ConnectionEncoder(Http2ConnectionEncoder delegate,
120 CompressionOptions... compressionOptionsArgs) {
121 super(delegate);
122 ObjectUtil.checkNotNull(compressionOptionsArgs, "CompressionOptions");
123 ObjectUtil.deepCheckNotNull("CompressionOptions", compressionOptionsArgs);
124
125 for (CompressionOptions compressionOptions : compressionOptionsArgs) {
126
127
128
129
130
131
132 if (Brotli.isAvailable() && compressionOptions instanceof BrotliOptions) {
133 brotliOptions = (BrotliOptions) compressionOptions;
134 } else if (compressionOptions instanceof GzipOptions) {
135 gzipCompressionOptions = (GzipOptions) compressionOptions;
136 } else if (compressionOptions instanceof DeflateOptions) {
137 deflateOptions = (DeflateOptions) compressionOptions;
138 } else if (compressionOptions instanceof ZstdOptions) {
139 zstdOptions = (ZstdOptions) compressionOptions;
140 } else {
141 throw new IllegalArgumentException("Unsupported " + CompressionOptions.class.getSimpleName() +
142 ": " + compressionOptions);
143 }
144 }
145
146 supportsCompressionOptions = true;
147
148 propertyKey = connection().newKey();
149 connection().addListener(new Http2ConnectionAdapter() {
150 @Override
151 public void onStreamRemoved(Http2Stream stream) {
152 final EmbeddedChannel compressor = stream.getProperty(propertyKey);
153 if (compressor != null) {
154 cleanup(stream, compressor);
155 }
156 }
157 });
158 }
159
160 @Override
161 public ChannelFuture writeData(final ChannelHandlerContext ctx, final int streamId, ByteBuf data, int padding,
162 final boolean endOfStream, ChannelPromise promise) {
163 final Http2Stream stream = connection().stream(streamId);
164 final EmbeddedChannel channel = stream == null ? null : (EmbeddedChannel) stream.getProperty(propertyKey);
165 if (channel == null) {
166
167 return super.writeData(ctx, streamId, data, padding, endOfStream, promise);
168 }
169
170 try {
171
172 channel.writeOutbound(data);
173 ByteBuf buf = nextReadableBuf(channel);
174 if (buf == null) {
175 if (endOfStream) {
176 if (channel.finish()) {
177 buf = nextReadableBuf(channel);
178 }
179 return super.writeData(ctx, streamId, buf == null ? Unpooled.EMPTY_BUFFER : buf, padding,
180 true, promise);
181 }
182
183 promise.setSuccess();
184 return promise;
185 }
186
187 PromiseCombiner combiner = new PromiseCombiner(ctx.executor());
188 for (;;) {
189 ByteBuf nextBuf = nextReadableBuf(channel);
190 boolean compressedEndOfStream = nextBuf == null && endOfStream;
191 if (compressedEndOfStream && channel.finish()) {
192 nextBuf = nextReadableBuf(channel);
193 compressedEndOfStream = nextBuf == null;
194 }
195
196 ChannelPromise bufPromise = ctx.newPromise();
197 combiner.add(bufPromise);
198 super.writeData(ctx, streamId, buf, padding, compressedEndOfStream, bufPromise);
199 if (nextBuf == null) {
200 break;
201 }
202
203 padding = 0;
204 buf = nextBuf;
205 }
206 combiner.finish(promise);
207 } catch (Throwable cause) {
208 promise.tryFailure(cause);
209 } finally {
210 if (endOfStream) {
211 cleanup(stream, channel);
212 }
213 }
214 return promise;
215 }
216
217 @Override
218 public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding,
219 boolean endStream, ChannelPromise promise) {
220 try {
221
222 EmbeddedChannel compressor = newCompressor(ctx, headers, endStream);
223
224
225 ChannelFuture future = super.writeHeaders(ctx, streamId, headers, padding, endStream, promise);
226
227
228 bindCompressorToStream(compressor, streamId);
229
230 return future;
231 } catch (Throwable e) {
232 promise.tryFailure(e);
233 }
234 return promise;
235 }
236
237 @Override
238 public ChannelFuture writeHeaders(final ChannelHandlerContext ctx, final int streamId, final Http2Headers headers,
239 final int streamDependency, final short weight, final boolean exclusive, final int padding,
240 final boolean endOfStream, final ChannelPromise promise) {
241 try {
242
243 EmbeddedChannel compressor = newCompressor(ctx, headers, endOfStream);
244
245
246 ChannelFuture future = super.writeHeaders(ctx, streamId, headers, streamDependency, weight, exclusive,
247 padding, endOfStream, promise);
248
249
250 bindCompressorToStream(compressor, streamId);
251
252 return future;
253 } catch (Throwable e) {
254 promise.tryFailure(e);
255 }
256 return promise;
257 }
258
259
260
261
262
263
264
265
266
267
268
269 protected EmbeddedChannel newContentCompressor(ChannelHandlerContext ctx, CharSequence contentEncoding)
270 throws Http2Exception {
271 if (GZIP.contentEqualsIgnoreCase(contentEncoding) || X_GZIP.contentEqualsIgnoreCase(contentEncoding)) {
272 return newCompressionChannel(ctx, ZlibWrapper.GZIP);
273 }
274 if (DEFLATE.contentEqualsIgnoreCase(contentEncoding) || X_DEFLATE.contentEqualsIgnoreCase(contentEncoding)) {
275 return newCompressionChannel(ctx, ZlibWrapper.ZLIB);
276 }
277 if (Brotli.isAvailable() && brotliOptions != null && BR.contentEqualsIgnoreCase(contentEncoding)) {
278 return new EmbeddedChannel(ctx.channel().id(), ctx.channel().metadata().hasDisconnect(),
279 ctx.channel().config(), new BrotliEncoder(brotliOptions.parameters()));
280 }
281 if (zstdOptions != null && ZSTD.contentEqualsIgnoreCase(contentEncoding)) {
282 return new EmbeddedChannel(ctx.channel().id(), ctx.channel().metadata().hasDisconnect(),
283 ctx.channel().config(), new ZstdEncoder(zstdOptions.compressionLevel(),
284 zstdOptions.blockSize(), zstdOptions.maxEncodeSize()));
285 }
286
287 return null;
288 }
289
290
291
292
293
294
295
296
297
298 protected CharSequence getTargetContentEncoding(CharSequence contentEncoding) throws Http2Exception {
299 return contentEncoding;
300 }
301
302
303
304
305
306
307 private EmbeddedChannel newCompressionChannel(final ChannelHandlerContext ctx, ZlibWrapper wrapper) {
308 if (supportsCompressionOptions) {
309 if (wrapper == ZlibWrapper.GZIP && gzipCompressionOptions != null) {
310 return new EmbeddedChannel(ctx.channel().id(), ctx.channel().metadata().hasDisconnect(),
311 ctx.channel().config(), ZlibCodecFactory.newZlibEncoder(wrapper,
312 gzipCompressionOptions.compressionLevel(), gzipCompressionOptions.windowBits(),
313 gzipCompressionOptions.memLevel()));
314 } else if (wrapper == ZlibWrapper.ZLIB && deflateOptions != null) {
315 return new EmbeddedChannel(ctx.channel().id(), ctx.channel().metadata().hasDisconnect(),
316 ctx.channel().config(), ZlibCodecFactory.newZlibEncoder(wrapper,
317 deflateOptions.compressionLevel(), deflateOptions.windowBits(),
318 deflateOptions.memLevel()));
319 } else {
320 throw new IllegalArgumentException("Unsupported ZlibWrapper: " + wrapper);
321 }
322 } else {
323 return new EmbeddedChannel(ctx.channel().id(), ctx.channel().metadata().hasDisconnect(),
324 ctx.channel().config(), ZlibCodecFactory.newZlibEncoder(wrapper, compressionLevel, windowBits,
325 memLevel));
326 }
327 }
328
329
330
331
332
333
334
335
336
337
338
339 private EmbeddedChannel newCompressor(ChannelHandlerContext ctx, Http2Headers headers, boolean endOfStream)
340 throws Http2Exception {
341 if (endOfStream) {
342 return null;
343 }
344
345 CharSequence encoding = headers.get(CONTENT_ENCODING);
346 if (encoding == null) {
347 encoding = IDENTITY;
348 }
349 final EmbeddedChannel compressor = newContentCompressor(ctx, encoding);
350 if (compressor != null) {
351 CharSequence targetContentEncoding = getTargetContentEncoding(encoding);
352 if (IDENTITY.contentEqualsIgnoreCase(targetContentEncoding)) {
353 headers.remove(CONTENT_ENCODING);
354 } else {
355 headers.set(CONTENT_ENCODING, targetContentEncoding);
356 }
357
358
359
360
361 headers.remove(CONTENT_LENGTH);
362 }
363
364 return compressor;
365 }
366
367
368
369
370
371
372 private void bindCompressorToStream(EmbeddedChannel compressor, int streamId) {
373 if (compressor != null) {
374 Http2Stream stream = connection().stream(streamId);
375 if (stream != null) {
376 stream.setProperty(propertyKey, compressor);
377 }
378 }
379 }
380
381
382
383
384
385
386
387 void cleanup(Http2Stream stream, EmbeddedChannel compressor) {
388 compressor.finishAndReleaseAll();
389 stream.removeProperty(propertyKey);
390 }
391
392
393
394
395
396
397
398 private static ByteBuf nextReadableBuf(EmbeddedChannel compressor) {
399 for (;;) {
400 final ByteBuf buf = compressor.readOutbound();
401 if (buf == null) {
402 return null;
403 }
404 if (!buf.isReadable()) {
405 buf.release();
406 continue;
407 }
408 return buf;
409 }
410 }
411 }