1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package io.netty5.handler.codec.http2;
16
17 import io.netty5.buffer.api.Buffer;
18 import io.netty5.channel.ChannelHandlerContext;
19 import io.netty5.channel.embedded.EmbeddedChannel;
20 import io.netty5.handler.codec.compression.Brotli;
21 import io.netty5.handler.codec.compression.BrotliDecompressor;
22 import io.netty5.handler.codec.compression.Decompressor;
23 import io.netty5.handler.codec.compression.ZlibDecompressor;
24 import io.netty5.handler.codec.compression.ZlibWrapper;
25 import io.netty5.util.internal.UnstableApi;
26
27 import static io.netty5.handler.codec.http.HttpHeaderNames.CONTENT_ENCODING;
28 import static io.netty5.handler.codec.http.HttpHeaderNames.CONTENT_LENGTH;
29 import static io.netty5.handler.codec.http.HttpHeaderValues.BR;
30 import static io.netty5.handler.codec.http.HttpHeaderValues.DEFLATE;
31 import static io.netty5.handler.codec.http.HttpHeaderValues.GZIP;
32 import static io.netty5.handler.codec.http.HttpHeaderValues.IDENTITY;
33 import static io.netty5.handler.codec.http.HttpHeaderValues.X_DEFLATE;
34 import static io.netty5.handler.codec.http.HttpHeaderValues.X_GZIP;
35 import static io.netty5.handler.codec.http2.Http2Error.INTERNAL_ERROR;
36 import static io.netty5.handler.codec.http2.Http2Exception.streamError;
37 import static io.netty5.util.internal.ObjectUtil.checkPositiveOrZero;
38 import static java.util.Objects.requireNonNull;
39
40
41
42
43
44 @UnstableApi
45 public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecorator {
46
47 private final Http2Connection connection;
48 private final boolean strict;
49 private boolean flowControllerInitialized;
50 private final Http2Connection.PropertyKey propertyKey;
51
52 public DelegatingDecompressorFrameListener(Http2Connection connection, Http2FrameListener listener) {
53 this(connection, listener, true);
54 }
55
56 public DelegatingDecompressorFrameListener(Http2Connection connection, Http2FrameListener listener,
57 boolean strict) {
58 super(listener);
59 this.connection = connection;
60 this.strict = strict;
61
62 propertyKey = connection.newKey();
63 connection.addListener(new Http2ConnectionAdapter() {
64 @Override
65 public void onStreamRemoved(Http2Stream stream) {
66 final Http2Decompressor decompressor = decompressor(stream);
67 if (decompressor != null) {
68 cleanup(decompressor);
69 }
70 }
71 });
72 }
73
74 @Override
75 public int onDataRead(ChannelHandlerContext ctx, int streamId, Buffer data, int padding, boolean endOfStream)
76 throws Http2Exception {
77 final Http2Stream stream = connection.stream(streamId);
78 final Http2Decompressor decompressor = decompressor(stream);
79 if (decompressor == null) {
80
81 return listener.onDataRead(ctx, streamId, data, padding, endOfStream);
82 }
83
84 final Decompressor decomp = decompressor.decompressor();
85 final int compressedBytes = data.readableBytes() + padding;
86 decompressor.incrementCompressedBytes(compressedBytes);
87 Buffer decompressed = null;
88 try {
89 Http2LocalFlowController flowController = connection.local().flowController();
90 decompressor.incrementDecompressedBytes(padding);
91
92 for (;;) {
93 if (decomp.isFinished()) {
94 flowController.consumeBytes(stream, listener.onDataRead(
95 ctx, streamId, ctx.bufferAllocator().allocate(0), padding, endOfStream));
96 break;
97 }
98 int idx = data.readerOffset();
99 decompressed = decomp.decompress(data, ctx.bufferAllocator());
100 if (decompressed == null || idx == data.readerOffset()) {
101 flowController.consumeBytes(stream, listener.onDataRead(
102 ctx, streamId, ctx.bufferAllocator().allocate(0), padding, endOfStream));
103 break;
104 } else {
105
106
107
108 decompressor.incrementDecompressedBytes(decompressed.readableBytes());
109 flowController.consumeBytes(stream,
110 listener.onDataRead(ctx, streamId, decompressed, padding, false));
111 decompressed.close();
112 decompressed = null;
113 }
114 padding = 0;
115 }
116
117
118
119 return 0;
120 } catch (Http2Exception e) {
121 throw e;
122 } catch (Throwable t) {
123 throw streamError(stream.id(), INTERNAL_ERROR, t,
124 "Decompressor error detected while delegating data read on streamId %d", stream.id());
125 } finally {
126 if (decompressed != null) {
127 decompressed.close();
128 }
129 }
130 }
131
132 @Override
133 public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding,
134 boolean endStream) throws Http2Exception {
135 initDecompressor(ctx, streamId, headers, endStream);
136 listener.onHeadersRead(ctx, streamId, headers, padding, endStream);
137 }
138
139 @Override
140 public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int streamDependency,
141 short weight, boolean exclusive, int padding, boolean endStream) throws Http2Exception {
142 initDecompressor(ctx, streamId, headers, endStream);
143 listener.onHeadersRead(ctx, streamId, headers, streamDependency, weight, exclusive, padding, endStream);
144 }
145
146
147
148
149
150
151
152
153
154
155 protected Decompressor newContentDecompressor(final ChannelHandlerContext ctx, CharSequence contentEncoding)
156 throws Http2Exception {
157 if (GZIP.contentEqualsIgnoreCase(contentEncoding) || X_GZIP.contentEqualsIgnoreCase(contentEncoding)) {
158 return ZlibDecompressor.newFactory(ZlibWrapper.GZIP).get();
159 }
160 if (DEFLATE.contentEqualsIgnoreCase(contentEncoding) || X_DEFLATE.contentEqualsIgnoreCase(contentEncoding)) {
161 final ZlibWrapper wrapper = strict ? ZlibWrapper.ZLIB : ZlibWrapper.ZLIB_OR_NONE;
162
163 return ZlibDecompressor.newFactory(wrapper).get();
164 }
165 if (Brotli.isAvailable() && BR.contentEqualsIgnoreCase(contentEncoding)) {
166 return BrotliDecompressor.newFactory().get();
167 }
168
169 return null;
170 }
171
172
173
174
175
176
177
178
179
180 protected CharSequence getTargetContentEncoding(@SuppressWarnings("UnusedParameters") CharSequence contentEncoding)
181 throws Http2Exception {
182 return IDENTITY;
183 }
184
185
186
187
188
189
190
191
192
193
194
195 private void initDecompressor(ChannelHandlerContext ctx, int streamId, Http2Headers headers, boolean endOfStream)
196 throws Http2Exception {
197 final Http2Stream stream = connection.stream(streamId);
198 if (stream == null) {
199 return;
200 }
201
202 Http2Decompressor decompressor = decompressor(stream);
203 if (decompressor == null && !endOfStream) {
204
205 CharSequence contentEncoding = headers.get(CONTENT_ENCODING);
206 if (contentEncoding == null) {
207 contentEncoding = IDENTITY;
208 }
209 final Decompressor decomp = newContentDecompressor(ctx, contentEncoding);
210 if (decomp != null) {
211 decompressor = new Http2Decompressor(decomp);
212 stream.setProperty(propertyKey, decompressor);
213
214
215 CharSequence targetContentEncoding = getTargetContentEncoding(contentEncoding);
216 if (IDENTITY.contentEqualsIgnoreCase(targetContentEncoding)) {
217 headers.remove(CONTENT_ENCODING);
218 } else {
219 headers.set(CONTENT_ENCODING, targetContentEncoding);
220 }
221 }
222 }
223
224 if (decompressor != null) {
225
226
227
228 headers.remove(CONTENT_LENGTH);
229
230
231
232 if (!flowControllerInitialized) {
233 flowControllerInitialized = true;
234 connection.local().flowController(new ConsumedBytesConverter(connection.local().flowController()));
235 }
236 }
237 }
238
239 Http2Decompressor decompressor(Http2Stream stream) {
240 return stream == null ? null : (Http2Decompressor) stream.getProperty(propertyKey);
241 }
242
243
244
245
246
247
248 private static void cleanup(Http2Decompressor decompressor) {
249 decompressor.decompressor().close();
250 }
251
252
253
254
255 private final class ConsumedBytesConverter implements Http2LocalFlowController {
256 private final Http2LocalFlowController flowController;
257
258 ConsumedBytesConverter(Http2LocalFlowController flowController) {
259 this.flowController = requireNonNull(flowController, "flowController");
260 }
261
262 @Override
263 public Http2LocalFlowController frameWriter(Http2FrameWriter frameWriter) {
264 return flowController.frameWriter(frameWriter);
265 }
266
267 @Override
268 public void channelHandlerContext(ChannelHandlerContext ctx) throws Http2Exception {
269 flowController.channelHandlerContext(ctx);
270 }
271
272 @Override
273 public void initialWindowSize(int newWindowSize) throws Http2Exception {
274 flowController.initialWindowSize(newWindowSize);
275 }
276
277 @Override
278 public int initialWindowSize() {
279 return flowController.initialWindowSize();
280 }
281
282 @Override
283 public int windowSize(Http2Stream stream) {
284 return flowController.windowSize(stream);
285 }
286
287 @Override
288 public void incrementWindowSize(Http2Stream stream, int delta) throws Http2Exception {
289 flowController.incrementWindowSize(stream, delta);
290 }
291
292 @Override
293 public void receiveFlowControlledFrame(Http2Stream stream, Buffer data, int padding,
294 boolean endOfStream) throws Http2Exception {
295 flowController.receiveFlowControlledFrame(stream, data, padding, endOfStream);
296 }
297
298 @Override
299 public boolean consumeBytes(Http2Stream stream, int numBytes) throws Http2Exception {
300 Http2Decompressor decompressor = decompressor(stream);
301 if (decompressor != null) {
302
303 numBytes = decompressor.consumeBytes(stream.id(), numBytes);
304 }
305 try {
306 return flowController.consumeBytes(stream, numBytes);
307 } catch (Http2Exception e) {
308 throw e;
309 } catch (Throwable t) {
310
311
312 throw streamError(stream.id(), INTERNAL_ERROR, t, "Error while returning bytes to flow control window");
313 }
314 }
315
316 @Override
317 public int unconsumedBytes(Http2Stream stream) {
318 return flowController.unconsumedBytes(stream);
319 }
320
321 @Override
322 public int initialWindowSize(Http2Stream stream) {
323 return flowController.initialWindowSize(stream);
324 }
325 }
326
327
328
329
330 private static final class Http2Decompressor {
331 private final Decompressor decompressor;
332 private int compressed;
333 private int decompressed;
334
335 Http2Decompressor(Decompressor decompressor) {
336 this.decompressor = decompressor;
337 }
338
339
340
341
342 Decompressor decompressor() {
343 return decompressor;
344 }
345
346
347
348
349 void incrementCompressedBytes(int delta) {
350 assert delta >= 0;
351 compressed += delta;
352 }
353
354
355
356
357 void incrementDecompressedBytes(int delta) {
358 assert delta >= 0;
359 decompressed += delta;
360 }
361
362
363
364
365
366
367
368
369
370 int consumeBytes(int streamId, int decompressedBytes) throws Http2Exception {
371 checkPositiveOrZero(decompressedBytes, "decompressedBytes");
372 if (decompressed - decompressedBytes < 0) {
373 throw streamError(streamId, INTERNAL_ERROR,
374 "Attempting to return too many bytes for stream %d. decompressed: %d " +
375 "decompressedBytes: %d", streamId, decompressed, decompressedBytes);
376 }
377 double consumedRatio = decompressedBytes / (double) decompressed;
378 int consumedCompressed = Math.min(compressed, (int) Math.ceil(compressed * consumedRatio));
379 if (compressed - consumedCompressed < 0) {
380 throw streamError(streamId, INTERNAL_ERROR,
381 "overflow when converting decompressed bytes to compressed bytes for stream %d." +
382 "decompressedBytes: %d decompressed: %d compressed: %d consumedCompressed: %d",
383 streamId, decompressedBytes, decompressed, compressed, consumedCompressed);
384 }
385 decompressed -= decompressedBytes;
386 compressed -= consumedCompressed;
387
388 return consumedCompressed;
389 }
390 }
391 }