1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package io.netty.handler.codec.http2;
16
17 import io.netty.buffer.ByteBuf;
18 import io.netty.buffer.Unpooled;
19 import io.netty.channel.ChannelHandlerContext;
20 import io.netty.channel.embedded.EmbeddedChannel;
21 import io.netty.handler.codec.ByteToMessageDecoder;
22 import io.netty.util.internal.UnstableApi;
23 import io.netty.handler.codec.compression.Brotli;
24 import io.netty.handler.codec.compression.BrotliDecoder;
25 import io.netty.handler.codec.compression.Zstd;
26 import io.netty.handler.codec.compression.ZstdDecoder;
27 import io.netty.handler.codec.compression.ZlibCodecFactory;
28 import io.netty.handler.codec.compression.ZlibWrapper;
29 import io.netty.handler.codec.compression.SnappyFrameDecoder;
30
31 import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_ENCODING;
32 import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_LENGTH;
33 import static io.netty.handler.codec.http.HttpHeaderValues.BR;
34 import static io.netty.handler.codec.http.HttpHeaderValues.DEFLATE;
35 import static io.netty.handler.codec.http.HttpHeaderValues.GZIP;
36 import static io.netty.handler.codec.http.HttpHeaderValues.IDENTITY;
37 import static io.netty.handler.codec.http.HttpHeaderValues.X_DEFLATE;
38 import static io.netty.handler.codec.http.HttpHeaderValues.X_GZIP;
39 import static io.netty.handler.codec.http.HttpHeaderValues.SNAPPY;
40 import static io.netty.handler.codec.http.HttpHeaderValues.ZSTD;
41 import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
42 import static io.netty.handler.codec.http2.Http2Exception.streamError;
43 import static io.netty.util.internal.ObjectUtil.checkNotNull;
44 import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
45
46
47
48
49
50 @UnstableApi
51 public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecorator {
52
53 private final Http2Connection connection;
54 private final boolean strict;
55 private boolean flowControllerInitialized;
56 private final Http2Connection.PropertyKey propertyKey;
57
58 public DelegatingDecompressorFrameListener(Http2Connection connection, Http2FrameListener listener) {
59 this(connection, listener, true);
60 }
61
62 public DelegatingDecompressorFrameListener(Http2Connection connection, Http2FrameListener listener,
63 boolean strict) {
64 super(listener);
65 this.connection = connection;
66 this.strict = strict;
67
68 propertyKey = connection.newKey();
69 connection.addListener(new Http2ConnectionAdapter() {
70 @Override
71 public void onStreamRemoved(Http2Stream stream) {
72 final Http2Decompressor decompressor = decompressor(stream);
73 if (decompressor != null) {
74 cleanup(decompressor);
75 }
76 }
77 });
78 }
79
80 @Override
81 public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endOfStream)
82 throws Http2Exception {
83 final Http2Stream stream = connection.stream(streamId);
84 final Http2Decompressor decompressor = decompressor(stream);
85 if (decompressor == null) {
86
87 return listener.onDataRead(ctx, streamId, data, padding, endOfStream);
88 }
89
90 final EmbeddedChannel channel = decompressor.decompressor();
91 final int compressedBytes = data.readableBytes() + padding;
92 decompressor.incrementCompressedBytes(compressedBytes);
93 try {
94
95 channel.writeInbound(data.retain());
96 ByteBuf buf = nextReadableBuf(channel);
97 if (buf == null && endOfStream && channel.finish()) {
98 buf = nextReadableBuf(channel);
99 }
100 if (buf == null) {
101 if (endOfStream) {
102 listener.onDataRead(ctx, streamId, Unpooled.EMPTY_BUFFER, padding, true);
103 }
104
105
106
107
108 decompressor.incrementDecompressedBytes(compressedBytes);
109 return compressedBytes;
110 }
111 try {
112 Http2LocalFlowController flowController = connection.local().flowController();
113 decompressor.incrementDecompressedBytes(padding);
114 for (;;) {
115 ByteBuf nextBuf = nextReadableBuf(channel);
116 boolean decompressedEndOfStream = nextBuf == null && endOfStream;
117 if (decompressedEndOfStream && channel.finish()) {
118 nextBuf = nextReadableBuf(channel);
119 decompressedEndOfStream = nextBuf == null;
120 }
121
122 decompressor.incrementDecompressedBytes(buf.readableBytes());
123
124
125
126 flowController.consumeBytes(stream,
127 listener.onDataRead(ctx, streamId, buf, padding, decompressedEndOfStream));
128 if (nextBuf == null) {
129 break;
130 }
131
132 padding = 0;
133 buf.release();
134 buf = nextBuf;
135 }
136
137
138
139 return 0;
140 } finally {
141 buf.release();
142 }
143 } catch (Http2Exception e) {
144 throw e;
145 } catch (Throwable t) {
146 throw streamError(stream.id(), INTERNAL_ERROR, t,
147 "Decompressor error detected while delegating data read on streamId %d", stream.id());
148 }
149 }
150
151 @Override
152 public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding,
153 boolean endStream) throws Http2Exception {
154 initDecompressor(ctx, streamId, headers, endStream);
155 listener.onHeadersRead(ctx, streamId, headers, padding, endStream);
156 }
157
158 @Override
159 public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int streamDependency,
160 short weight, boolean exclusive, int padding, boolean endStream) throws Http2Exception {
161 initDecompressor(ctx, streamId, headers, endStream);
162 listener.onHeadersRead(ctx, streamId, headers, streamDependency, weight, exclusive, padding, endStream);
163 }
164
165
166
167
168
169
170
171
172
173
174 protected EmbeddedChannel newContentDecompressor(final ChannelHandlerContext ctx, CharSequence contentEncoding)
175 throws Http2Exception {
176 if (GZIP.contentEqualsIgnoreCase(contentEncoding) || X_GZIP.contentEqualsIgnoreCase(contentEncoding)) {
177 return new EmbeddedChannel(ctx.channel().id(), ctx.channel().metadata().hasDisconnect(),
178 ctx.channel().config(), ZlibCodecFactory.newZlibDecoder(ZlibWrapper.GZIP));
179 }
180 if (DEFLATE.contentEqualsIgnoreCase(contentEncoding) || X_DEFLATE.contentEqualsIgnoreCase(contentEncoding)) {
181 final ZlibWrapper wrapper = strict ? ZlibWrapper.ZLIB : ZlibWrapper.ZLIB_OR_NONE;
182
183 return new EmbeddedChannel(ctx.channel().id(), ctx.channel().metadata().hasDisconnect(),
184 ctx.channel().config(), ZlibCodecFactory.newZlibDecoder(wrapper));
185 }
186 if (Brotli.isAvailable() && BR.contentEqualsIgnoreCase(contentEncoding)) {
187 return new EmbeddedChannel(ctx.channel().id(), ctx.channel().metadata().hasDisconnect(),
188 ctx.channel().config(), new BrotliDecoder());
189 }
190 if (SNAPPY.contentEqualsIgnoreCase(contentEncoding)) {
191 return new EmbeddedChannel(ctx.channel().id(), ctx.channel().metadata().hasDisconnect(),
192 ctx.channel().config(), new SnappyFrameDecoder());
193 }
194 if (Zstd.isAvailable() && ZSTD.contentEqualsIgnoreCase(contentEncoding)) {
195 return new EmbeddedChannel(ctx.channel().id(), ctx.channel().metadata().hasDisconnect(),
196 ctx.channel().config(), new ZstdDecoder());
197 }
198
199 return null;
200 }
201
202
203
204
205
206
207
208
209
210 protected CharSequence getTargetContentEncoding(@SuppressWarnings("UnusedParameters") CharSequence contentEncoding)
211 throws Http2Exception {
212 return IDENTITY;
213 }
214
215
216
217
218
219
220
221
222
223
224
225 private void initDecompressor(ChannelHandlerContext ctx, int streamId, Http2Headers headers, boolean endOfStream)
226 throws Http2Exception {
227 final Http2Stream stream = connection.stream(streamId);
228 if (stream == null) {
229 return;
230 }
231
232 Http2Decompressor decompressor = decompressor(stream);
233 if (decompressor == null && !endOfStream) {
234
235 CharSequence contentEncoding = headers.get(CONTENT_ENCODING);
236 if (contentEncoding == null) {
237 contentEncoding = IDENTITY;
238 }
239 final EmbeddedChannel channel = newContentDecompressor(ctx, contentEncoding);
240 if (channel != null) {
241 decompressor = new Http2Decompressor(channel);
242 stream.setProperty(propertyKey, decompressor);
243
244
245 CharSequence targetContentEncoding = getTargetContentEncoding(contentEncoding);
246 if (IDENTITY.contentEqualsIgnoreCase(targetContentEncoding)) {
247 headers.remove(CONTENT_ENCODING);
248 } else {
249 headers.set(CONTENT_ENCODING, targetContentEncoding);
250 }
251 }
252 }
253
254 if (decompressor != null) {
255
256
257
258 headers.remove(CONTENT_LENGTH);
259
260
261
262 if (!flowControllerInitialized) {
263 flowControllerInitialized = true;
264 connection.local().flowController(new ConsumedBytesConverter(connection.local().flowController()));
265 }
266 }
267 }
268
269 Http2Decompressor decompressor(Http2Stream stream) {
270 return stream == null ? null : (Http2Decompressor) stream.getProperty(propertyKey);
271 }
272
273
274
275
276
277
278 private static void cleanup(Http2Decompressor decompressor) {
279 decompressor.decompressor().finishAndReleaseAll();
280 }
281
282
283
284
285
286
287
288
289 private static ByteBuf nextReadableBuf(EmbeddedChannel decompressor) {
290 for (;;) {
291 final ByteBuf buf = decompressor.readInbound();
292 if (buf == null) {
293 return null;
294 }
295 if (!buf.isReadable()) {
296 buf.release();
297 continue;
298 }
299 return buf;
300 }
301 }
302
303
304
305
306 private final class ConsumedBytesConverter implements Http2LocalFlowController {
307 private final Http2LocalFlowController flowController;
308
309 ConsumedBytesConverter(Http2LocalFlowController flowController) {
310 this.flowController = checkNotNull(flowController, "flowController");
311 }
312
313 @Override
314 public Http2LocalFlowController frameWriter(Http2FrameWriter frameWriter) {
315 return flowController.frameWriter(frameWriter);
316 }
317
318 @Override
319 public void channelHandlerContext(ChannelHandlerContext ctx) throws Http2Exception {
320 flowController.channelHandlerContext(ctx);
321 }
322
323 @Override
324 public void initialWindowSize(int newWindowSize) throws Http2Exception {
325 flowController.initialWindowSize(newWindowSize);
326 }
327
328 @Override
329 public int initialWindowSize() {
330 return flowController.initialWindowSize();
331 }
332
333 @Override
334 public int windowSize(Http2Stream stream) {
335 return flowController.windowSize(stream);
336 }
337
338 @Override
339 public void incrementWindowSize(Http2Stream stream, int delta) throws Http2Exception {
340 flowController.incrementWindowSize(stream, delta);
341 }
342
343 @Override
344 public void receiveFlowControlledFrame(Http2Stream stream, ByteBuf data, int padding,
345 boolean endOfStream) throws Http2Exception {
346 flowController.receiveFlowControlledFrame(stream, data, padding, endOfStream);
347 }
348
349 @Override
350 public boolean consumeBytes(Http2Stream stream, int numBytes) throws Http2Exception {
351 Http2Decompressor decompressor = decompressor(stream);
352 if (decompressor != null) {
353
354 numBytes = decompressor.consumeBytes(stream.id(), numBytes);
355 }
356 try {
357 return flowController.consumeBytes(stream, numBytes);
358 } catch (Http2Exception e) {
359 throw e;
360 } catch (Throwable t) {
361
362
363 throw streamError(stream.id(), INTERNAL_ERROR, t, "Error while returning bytes to flow control window");
364 }
365 }
366
367 @Override
368 public int unconsumedBytes(Http2Stream stream) {
369 return flowController.unconsumedBytes(stream);
370 }
371
372 @Override
373 public int initialWindowSize(Http2Stream stream) {
374 return flowController.initialWindowSize(stream);
375 }
376 }
377
378
379
380
381 private static final class Http2Decompressor {
382 private final EmbeddedChannel decompressor;
383 private int compressed;
384 private int decompressed;
385
386 Http2Decompressor(EmbeddedChannel decompressor) {
387 this.decompressor = decompressor;
388 }
389
390
391
392
393 EmbeddedChannel decompressor() {
394 return decompressor;
395 }
396
397
398
399
400 void incrementCompressedBytes(int delta) {
401 assert delta >= 0;
402 compressed += delta;
403 }
404
405
406
407
408 void incrementDecompressedBytes(int delta) {
409 assert delta >= 0;
410 decompressed += delta;
411 }
412
413
414
415
416
417
418
419
420
421 int consumeBytes(int streamId, int decompressedBytes) throws Http2Exception {
422 checkPositiveOrZero(decompressedBytes, "decompressedBytes");
423 if (decompressed - decompressedBytes < 0) {
424 throw streamError(streamId, INTERNAL_ERROR,
425 "Attempting to return too many bytes for stream %d. decompressed: %d " +
426 "decompressedBytes: %d", streamId, decompressed, decompressedBytes);
427 }
428 double consumedRatio = decompressedBytes / (double) decompressed;
429 int consumedCompressed = Math.min(compressed, (int) Math.ceil(compressed * consumedRatio));
430 if (compressed - consumedCompressed < 0) {
431 throw streamError(streamId, INTERNAL_ERROR,
432 "overflow when converting decompressed bytes to compressed bytes for stream %d." +
433 "decompressedBytes: %d decompressed: %d compressed: %d consumedCompressed: %d",
434 streamId, decompressedBytes, decompressed, compressed, consumedCompressed);
435 }
436 decompressed -= decompressedBytes;
437 compressed -= consumedCompressed;
438
439 return consumedCompressed;
440 }
441 }
442 }