1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package io.netty.handler.codec.http2;
16
17 import io.netty.buffer.ByteBuf;
18 import io.netty.buffer.Unpooled;
19 import io.netty.channel.ChannelHandlerContext;
20 import io.netty.channel.ChannelInboundHandlerAdapter;
21 import io.netty.channel.embedded.EmbeddedChannel;
22 import io.netty.handler.codec.ByteToMessageDecoder;
23 import io.netty.handler.codec.compression.Brotli;
24 import io.netty.handler.codec.compression.BrotliDecoder;
25 import io.netty.handler.codec.compression.Zstd;
26 import io.netty.handler.codec.compression.ZstdDecoder;
27 import io.netty.handler.codec.compression.ZlibCodecFactory;
28 import io.netty.handler.codec.compression.ZlibWrapper;
29 import io.netty.handler.codec.compression.SnappyFrameDecoder;
30
31 import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_ENCODING;
32 import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_LENGTH;
33 import static io.netty.handler.codec.http.HttpHeaderValues.BR;
34 import static io.netty.handler.codec.http.HttpHeaderValues.DEFLATE;
35 import static io.netty.handler.codec.http.HttpHeaderValues.GZIP;
36 import static io.netty.handler.codec.http.HttpHeaderValues.IDENTITY;
37 import static io.netty.handler.codec.http.HttpHeaderValues.X_DEFLATE;
38 import static io.netty.handler.codec.http.HttpHeaderValues.X_GZIP;
39 import static io.netty.handler.codec.http.HttpHeaderValues.SNAPPY;
40 import static io.netty.handler.codec.http.HttpHeaderValues.ZSTD;
41 import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
42 import static io.netty.handler.codec.http2.Http2Exception.streamError;
43 import static io.netty.util.internal.ObjectUtil.checkNotNull;
44 import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
45
46
47
48
49
50 public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecorator {
51
52 private final Http2Connection connection;
53 private final boolean strict;
54 private boolean flowControllerInitialized;
55 private final Http2Connection.PropertyKey propertyKey;
56 private final int maxAllocation;
57
58
59
60
61
62
63
64
65
66
67 @Deprecated
68 public DelegatingDecompressorFrameListener(Http2Connection connection, Http2FrameListener listener) {
69 this(connection, listener, 0);
70 }
71
72
73
74
75
76
77
78
79
80 public DelegatingDecompressorFrameListener(Http2Connection connection, Http2FrameListener listener,
81 int maxAllocation) {
82 this(connection, listener, true, maxAllocation);
83 }
84
85
86
87
88
89
90
91
92
93
94
95
96 @Deprecated
97 public DelegatingDecompressorFrameListener(Http2Connection connection, Http2FrameListener listener,
98 boolean strict) {
99 this(connection, listener, strict, 0);
100 }
101
102
103
104
105
106
107
108
109
110
111
112 public DelegatingDecompressorFrameListener(Http2Connection connection, Http2FrameListener listener,
113 boolean strict, int maxAllocation) {
114 super(listener);
115 this.connection = connection;
116 this.strict = strict;
117 this.maxAllocation = checkPositiveOrZero(maxAllocation, "maxAllocation");
118
119 propertyKey = connection.newKey();
120 connection.addListener(new Http2ConnectionAdapter() {
121 @Override
122 public void onStreamRemoved(Http2Stream stream) {
123 final Http2Decompressor decompressor = decompressor(stream);
124 if (decompressor != null) {
125 decompressor.cleanup();
126 }
127 }
128 });
129 }
130
131 @Override
132 public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endOfStream)
133 throws Http2Exception {
134 final Http2Stream stream = connection.stream(streamId);
135 final Http2Decompressor decompressor = decompressor(stream);
136 if (decompressor == null) {
137
138 return listener.onDataRead(ctx, streamId, data, padding, endOfStream);
139 }
140 return decompressor.decompress(ctx, stream, data, padding, endOfStream);
141 }
142
143 @Override
144 public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding,
145 boolean endStream) throws Http2Exception {
146 initDecompressor(ctx, streamId, headers, endStream);
147 listener.onHeadersRead(ctx, streamId, headers, padding, endStream);
148 }
149
150 @Override
151 public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int streamDependency,
152 short weight, boolean exclusive, int padding, boolean endStream) throws Http2Exception {
153 initDecompressor(ctx, streamId, headers, endStream);
154 listener.onHeadersRead(ctx, streamId, headers, streamDependency, weight, exclusive, padding, endStream);
155 }
156
157
158
159
160
161
162
163
164
165
166 protected EmbeddedChannel newContentDecompressor(final ChannelHandlerContext ctx, CharSequence contentEncoding)
167 throws Http2Exception {
168 if (GZIP.contentEqualsIgnoreCase(contentEncoding) || X_GZIP.contentEqualsIgnoreCase(contentEncoding)) {
169 return new EmbeddedChannel(ctx.channel().id(), ctx.channel().metadata().hasDisconnect(),
170 ctx.channel().config(), ZlibCodecFactory.newZlibDecoder(ZlibWrapper.GZIP, maxAllocation));
171 }
172 if (DEFLATE.contentEqualsIgnoreCase(contentEncoding) || X_DEFLATE.contentEqualsIgnoreCase(contentEncoding)) {
173 final ZlibWrapper wrapper = strict ? ZlibWrapper.ZLIB : ZlibWrapper.ZLIB_OR_NONE;
174
175 return new EmbeddedChannel(ctx.channel().id(), ctx.channel().metadata().hasDisconnect(),
176 ctx.channel().config(), ZlibCodecFactory.newZlibDecoder(wrapper, maxAllocation));
177 }
178 if (Brotli.isAvailable() && BR.contentEqualsIgnoreCase(contentEncoding)) {
179 return new EmbeddedChannel(ctx.channel().id(), ctx.channel().metadata().hasDisconnect(),
180 ctx.channel().config(), new BrotliDecoder());
181 }
182 if (SNAPPY.contentEqualsIgnoreCase(contentEncoding)) {
183 return new EmbeddedChannel(ctx.channel().id(), ctx.channel().metadata().hasDisconnect(),
184 ctx.channel().config(), new SnappyFrameDecoder());
185 }
186 if (Zstd.isAvailable() && ZSTD.contentEqualsIgnoreCase(contentEncoding)) {
187 return new EmbeddedChannel(ctx.channel().id(), ctx.channel().metadata().hasDisconnect(),
188 ctx.channel().config(), new ZstdDecoder());
189 }
190
191 return null;
192 }
193
194
195
196
197
198
199
200
201
202 protected CharSequence getTargetContentEncoding(@SuppressWarnings("UnusedParameters") CharSequence contentEncoding)
203 throws Http2Exception {
204 return IDENTITY;
205 }
206
207
208
209
210
211
212
213
214
215
216
217 private void initDecompressor(ChannelHandlerContext ctx, int streamId, Http2Headers headers, boolean endOfStream)
218 throws Http2Exception {
219 final Http2Stream stream = connection.stream(streamId);
220 if (stream == null) {
221 return;
222 }
223
224 Http2Decompressor decompressor = decompressor(stream);
225 if (decompressor == null && !endOfStream) {
226
227 CharSequence contentEncoding = headers.get(CONTENT_ENCODING);
228 if (contentEncoding == null) {
229 contentEncoding = IDENTITY;
230 }
231 final EmbeddedChannel channel = newContentDecompressor(ctx, contentEncoding);
232 if (channel != null) {
233 decompressor = new Http2Decompressor(channel, connection, listener);
234 stream.setProperty(propertyKey, decompressor);
235
236
237 CharSequence targetContentEncoding = getTargetContentEncoding(contentEncoding);
238 if (IDENTITY.contentEqualsIgnoreCase(targetContentEncoding)) {
239 headers.remove(CONTENT_ENCODING);
240 } else {
241 headers.set(CONTENT_ENCODING, targetContentEncoding);
242 }
243 }
244 }
245
246 if (decompressor != null) {
247
248
249
250 headers.remove(CONTENT_LENGTH);
251
252
253
254 if (!flowControllerInitialized) {
255 flowControllerInitialized = true;
256 connection.local().flowController(new ConsumedBytesConverter(connection.local().flowController()));
257 }
258 }
259 }
260
261 Http2Decompressor decompressor(Http2Stream stream) {
262 return stream == null ? null : (Http2Decompressor) stream.getProperty(propertyKey);
263 }
264
265
266
267
268 private final class ConsumedBytesConverter implements Http2LocalFlowController {
269 private final Http2LocalFlowController flowController;
270
271 ConsumedBytesConverter(Http2LocalFlowController flowController) {
272 this.flowController = checkNotNull(flowController, "flowController");
273 }
274
275 @Override
276 public Http2LocalFlowController frameWriter(Http2FrameWriter frameWriter) {
277 return flowController.frameWriter(frameWriter);
278 }
279
280 @Override
281 public void channelHandlerContext(ChannelHandlerContext ctx) throws Http2Exception {
282 flowController.channelHandlerContext(ctx);
283 }
284
285 @Override
286 public void initialWindowSize(int newWindowSize) throws Http2Exception {
287 flowController.initialWindowSize(newWindowSize);
288 }
289
290 @Override
291 public int initialWindowSize() {
292 return flowController.initialWindowSize();
293 }
294
295 @Override
296 public int windowSize(Http2Stream stream) {
297 return flowController.windowSize(stream);
298 }
299
300 @Override
301 public void incrementWindowSize(Http2Stream stream, int delta) throws Http2Exception {
302 flowController.incrementWindowSize(stream, delta);
303 }
304
305 @Override
306 public void receiveFlowControlledFrame(Http2Stream stream, ByteBuf data, int padding,
307 boolean endOfStream) throws Http2Exception {
308 flowController.receiveFlowControlledFrame(stream, data, padding, endOfStream);
309 }
310
311 @Override
312 public boolean consumeBytes(Http2Stream stream, int numBytes) throws Http2Exception {
313 Http2Decompressor decompressor = decompressor(stream);
314 if (decompressor != null) {
315
316 numBytes = decompressor.consumeBytes(stream.id(), numBytes);
317 }
318 try {
319 return flowController.consumeBytes(stream, numBytes);
320 } catch (Http2Exception e) {
321 throw e;
322 } catch (Throwable t) {
323
324
325 throw streamError(stream.id(), INTERNAL_ERROR, t, "Error while returning bytes to flow control window");
326 }
327 }
328
329 @Override
330 public int unconsumedBytes(Http2Stream stream) {
331 return flowController.unconsumedBytes(stream);
332 }
333
334 @Override
335 public int initialWindowSize(Http2Stream stream) {
336 return flowController.initialWindowSize(stream);
337 }
338 }
339
340
341
342
343 private static final class Http2Decompressor {
344 private final EmbeddedChannel decompressor;
345
346 private int compressed;
347 private int decompressed;
348 private Http2Stream stream;
349 private int padding;
350 private boolean dataDecompressed;
351 private ChannelHandlerContext targetCtx;
352
353 Http2Decompressor(EmbeddedChannel decompressor,
354 final Http2Connection connection, final Http2FrameListener listener) {
355 this.decompressor = decompressor;
356 this.decompressor.pipeline().addLast(new ChannelInboundHandlerAdapter() {
357 @Override
358 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
359 ByteBuf buf = (ByteBuf) msg;
360 if (!buf.isReadable()) {
361 buf.release();
362 return;
363 }
364 incrementDecompressedBytes(buf.readableBytes());
365
366
367
368 connection.local().flowController().consumeBytes(stream,
369 listener.onDataRead(targetCtx, stream.id(), buf, padding, false));
370 padding = 0;
371 buf.release();
372
373 dataDecompressed = true;
374 }
375
376 @Override
377 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
378 listener.onDataRead(targetCtx, stream.id(), Unpooled.EMPTY_BUFFER, padding, true);
379 }
380 });
381 }
382
383
384
385
386 void cleanup() {
387 decompressor.finishAndReleaseAll();
388 }
389
390 int decompress(ChannelHandlerContext ctx, Http2Stream stream, ByteBuf data, int padding, boolean endOfStream)
391 throws Http2Exception {
392 final int compressedBytes = data.readableBytes() + padding;
393 incrementCompressedBytes(compressedBytes);
394 try {
395 this.stream = stream;
396 this.padding = padding;
397 this.dataDecompressed = false;
398 this.targetCtx = ctx;
399
400
401 decompressor.writeInbound(data.retain());
402 if (endOfStream) {
403 decompressor.finish();
404
405 if (!dataDecompressed) {
406
407
408
409
410
411 incrementDecompressedBytes(compressedBytes);
412 return compressedBytes;
413 }
414 }
415
416
417
418 return 0;
419 } catch (Throwable t) {
420
421 if (t instanceof Http2Exception) {
422 throw (Http2Exception) t;
423 }
424 throw streamError(stream.id(), INTERNAL_ERROR, t,
425 "Decompressor error detected while delegating data read on streamId %d", stream.id());
426 }
427 }
428
429
430
431 private void incrementCompressedBytes(int delta) {
432 assert delta >= 0;
433 compressed += delta;
434 }
435
436
437
438
439 private void incrementDecompressedBytes(int delta) {
440 assert delta >= 0;
441 decompressed += delta;
442 }
443
444
445
446
447
448
449
450
451
452 int consumeBytes(int streamId, int decompressedBytes) throws Http2Exception {
453 checkPositiveOrZero(decompressedBytes, "decompressedBytes");
454 if (decompressed - decompressedBytes < 0) {
455 throw streamError(streamId, INTERNAL_ERROR,
456 "Attempting to return too many bytes for stream %d. decompressed: %d " +
457 "decompressedBytes: %d", streamId, decompressed, decompressedBytes);
458 }
459 double consumedRatio = decompressedBytes / (double) decompressed;
460 int consumedCompressed = Math.min(compressed, (int) Math.ceil(compressed * consumedRatio));
461 if (compressed - consumedCompressed < 0) {
462 throw streamError(streamId, INTERNAL_ERROR,
463 "overflow when converting decompressed bytes to compressed bytes for stream %d." +
464 "decompressedBytes: %d decompressed: %d compressed: %d consumedCompressed: %d",
465 streamId, decompressedBytes, decompressed, compressed, consumedCompressed);
466 }
467 decompressed -= decompressedBytes;
468 compressed -= consumedCompressed;
469
470 return consumedCompressed;
471 }
472 }
473 }