View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License, version 2.0 (the
5    * "License"); you may not use this file except in compliance with the License. You may obtain a
6    * copy of the License at:
7    *
8    * https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software distributed under the License
11   * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12   * or implied. See the License for the specific language governing permissions and limitations under
13   * the License.
14   */
15  package io.netty.handler.codec.http2;
16  
17  import io.netty.buffer.ByteBuf;
18  import io.netty.buffer.Unpooled;
19  import io.netty.channel.ChannelHandlerContext;
20  import io.netty.channel.embedded.EmbeddedChannel;
21  import io.netty.handler.codec.ByteToMessageDecoder;
22  import io.netty.util.internal.UnstableApi;
23  import io.netty.handler.codec.compression.Brotli;
24  import io.netty.handler.codec.compression.BrotliDecoder;
25  import io.netty.handler.codec.compression.Zstd;
26  import io.netty.handler.codec.compression.ZstdDecoder;
27  import io.netty.handler.codec.compression.ZlibCodecFactory;
28  import io.netty.handler.codec.compression.ZlibWrapper;
29  import io.netty.handler.codec.compression.SnappyFrameDecoder;
30  
31  import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_ENCODING;
32  import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_LENGTH;
33  import static io.netty.handler.codec.http.HttpHeaderValues.BR;
34  import static io.netty.handler.codec.http.HttpHeaderValues.DEFLATE;
35  import static io.netty.handler.codec.http.HttpHeaderValues.GZIP;
36  import static io.netty.handler.codec.http.HttpHeaderValues.IDENTITY;
37  import static io.netty.handler.codec.http.HttpHeaderValues.X_DEFLATE;
38  import static io.netty.handler.codec.http.HttpHeaderValues.X_GZIP;
39  import static io.netty.handler.codec.http.HttpHeaderValues.SNAPPY;
40  import static io.netty.handler.codec.http.HttpHeaderValues.ZSTD;
41  import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
42  import static io.netty.handler.codec.http2.Http2Exception.streamError;
43  import static io.netty.util.internal.ObjectUtil.checkNotNull;
44  import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
45  
46  /**
47   * An HTTP2 frame listener that will decompress data frames according to the {@code content-encoding} header for each
48   * stream. The decompression provided by this class will be applied to the data for the entire stream.
49   */
50  @UnstableApi
51  public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecorator {
52  
53      private final Http2Connection connection;
54      private final boolean strict;
55      private boolean flowControllerInitialized;
56      private final Http2Connection.PropertyKey propertyKey;
57  
58      public DelegatingDecompressorFrameListener(Http2Connection connection, Http2FrameListener listener) {
59          this(connection, listener, true);
60      }
61  
62      public DelegatingDecompressorFrameListener(Http2Connection connection, Http2FrameListener listener,
63                      boolean strict) {
64          super(listener);
65          this.connection = connection;
66          this.strict = strict;
67  
68          propertyKey = connection.newKey();
69          connection.addListener(new Http2ConnectionAdapter() {
70              @Override
71              public void onStreamRemoved(Http2Stream stream) {
72                  final Http2Decompressor decompressor = decompressor(stream);
73                  if (decompressor != null) {
74                      cleanup(decompressor);
75                  }
76              }
77          });
78      }
79  
80      @Override
81      public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endOfStream)
82              throws Http2Exception {
83          final Http2Stream stream = connection.stream(streamId);
84          final Http2Decompressor decompressor = decompressor(stream);
85          if (decompressor == null) {
86              // The decompressor may be null if no compatible encoding type was found in this stream's headers
87              return listener.onDataRead(ctx, streamId, data, padding, endOfStream);
88          }
89  
90          final EmbeddedChannel channel = decompressor.decompressor();
91          final int compressedBytes = data.readableBytes() + padding;
92          decompressor.incrementCompressedBytes(compressedBytes);
93          try {
94              // call retain here as it will call release after its written to the channel
95              channel.writeInbound(data.retain());
96              ByteBuf buf = nextReadableBuf(channel);
97              if (buf == null && endOfStream && channel.finish()) {
98                  buf = nextReadableBuf(channel);
99              }
100             if (buf == null) {
101                 if (endOfStream) {
102                     listener.onDataRead(ctx, streamId, Unpooled.EMPTY_BUFFER, padding, true);
103                 }
104                 // No new decompressed data was extracted from the compressed data. This means the application could
105                 // not be provided with data and thus could not return how many bytes were processed. We will assume
106                 // there is more data coming which will complete the decompression block. To allow for more data we
107                 // return all bytes to the flow control window (so the peer can send more data).
108                 decompressor.incrementDecompressedBytes(compressedBytes);
109                 return compressedBytes;
110             }
111             try {
112                 Http2LocalFlowController flowController = connection.local().flowController();
113                 decompressor.incrementDecompressedBytes(padding);
114                 for (;;) {
115                     ByteBuf nextBuf = nextReadableBuf(channel);
116                     boolean decompressedEndOfStream = nextBuf == null && endOfStream;
117                     if (decompressedEndOfStream && channel.finish()) {
118                         nextBuf = nextReadableBuf(channel);
119                         decompressedEndOfStream = nextBuf == null;
120                     }
121 
122                     decompressor.incrementDecompressedBytes(buf.readableBytes());
123                     // Immediately return the bytes back to the flow controller. ConsumedBytesConverter will convert
124                     // from the decompressed amount which the user knows about to the compressed amount which flow
125                     // control knows about.
126                     flowController.consumeBytes(stream,
127                             listener.onDataRead(ctx, streamId, buf, padding, decompressedEndOfStream));
128                     if (nextBuf == null) {
129                         break;
130                     }
131 
132                     padding = 0; // Padding is only communicated once on the first iteration.
133                     buf.release();
134                     buf = nextBuf;
135                 }
136                 // We consume bytes each time we call the listener to ensure if multiple frames are decompressed
137                 // that the bytes are accounted for immediately. Otherwise the user may see an inconsistent state of
138                 // flow control.
139                 return 0;
140             } finally {
141                 buf.release();
142             }
143         } catch (Http2Exception e) {
144             throw e;
145         } catch (Throwable t) {
146             throw streamError(stream.id(), INTERNAL_ERROR, t,
147                     "Decompressor error detected while delegating data read on streamId %d", stream.id());
148         }
149     }
150 
151     @Override
152     public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding,
153                     boolean endStream) throws Http2Exception {
154         initDecompressor(ctx, streamId, headers, endStream);
155         listener.onHeadersRead(ctx, streamId, headers, padding, endStream);
156     }
157 
158     @Override
159     public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int streamDependency,
160                     short weight, boolean exclusive, int padding, boolean endStream) throws Http2Exception {
161         initDecompressor(ctx, streamId, headers, endStream);
162         listener.onHeadersRead(ctx, streamId, headers, streamDependency, weight, exclusive, padding, endStream);
163     }
164 
165     /**
166      * Returns a new {@link EmbeddedChannel} that decodes the HTTP2 message content encoded in the specified
167      * {@code contentEncoding}.
168      *
169      * @param contentEncoding the value of the {@code content-encoding} header
170      * @return a new {@link ByteToMessageDecoder} if the specified encoding is supported. {@code null} otherwise
171      *         (alternatively, you can throw a {@link Http2Exception} to block unknown encoding).
172      * @throws Http2Exception If the specified encoding is not supported and warrants an exception
173      */
174     protected EmbeddedChannel newContentDecompressor(final ChannelHandlerContext ctx, CharSequence contentEncoding)
175             throws Http2Exception {
176         if (GZIP.contentEqualsIgnoreCase(contentEncoding) || X_GZIP.contentEqualsIgnoreCase(contentEncoding)) {
177             return new EmbeddedChannel(ctx.channel().id(), ctx.channel().metadata().hasDisconnect(),
178                     ctx.channel().config(), ZlibCodecFactory.newZlibDecoder(ZlibWrapper.GZIP));
179         }
180         if (DEFLATE.contentEqualsIgnoreCase(contentEncoding) || X_DEFLATE.contentEqualsIgnoreCase(contentEncoding)) {
181             final ZlibWrapper wrapper = strict ? ZlibWrapper.ZLIB : ZlibWrapper.ZLIB_OR_NONE;
182             // To be strict, 'deflate' means ZLIB, but some servers were not implemented correctly.
183             return new EmbeddedChannel(ctx.channel().id(), ctx.channel().metadata().hasDisconnect(),
184                     ctx.channel().config(), ZlibCodecFactory.newZlibDecoder(wrapper));
185         }
186         if (Brotli.isAvailable() && BR.contentEqualsIgnoreCase(contentEncoding)) {
187             return new EmbeddedChannel(ctx.channel().id(), ctx.channel().metadata().hasDisconnect(),
188               ctx.channel().config(), new BrotliDecoder());
189         }
190         if (SNAPPY.contentEqualsIgnoreCase(contentEncoding)) {
191             return new EmbeddedChannel(ctx.channel().id(), ctx.channel().metadata().hasDisconnect(),
192                     ctx.channel().config(), new SnappyFrameDecoder());
193         }
194         if (Zstd.isAvailable() && ZSTD.contentEqualsIgnoreCase(contentEncoding)) {
195             return new EmbeddedChannel(ctx.channel().id(), ctx.channel().metadata().hasDisconnect(),
196                     ctx.channel().config(), new ZstdDecoder());
197         }
198         // 'identity' or unsupported
199         return null;
200     }
201 
202     /**
203      * Returns the expected content encoding of the decoded content. This getMethod returns {@code "identity"} by
204      * default, which is the case for most decompressors.
205      *
206      * @param contentEncoding the value of the {@code content-encoding} header
207      * @return the expected content encoding of the new content.
208      * @throws Http2Exception if the {@code contentEncoding} is not supported and warrants an exception
209      */
210     protected CharSequence getTargetContentEncoding(@SuppressWarnings("UnusedParameters") CharSequence contentEncoding)
211                     throws Http2Exception {
212         return IDENTITY;
213     }
214 
215     /**
216      * Checks if a new decompressor object is needed for the stream identified by {@code streamId}.
217      * This method will modify the {@code content-encoding} header contained in {@code headers}.
218      *
219      * @param ctx The context
220      * @param streamId The identifier for the headers inside {@code headers}
221      * @param headers Object representing headers which have been read
222      * @param endOfStream Indicates if the stream has ended
223      * @throws Http2Exception If the {@code content-encoding} is not supported
224      */
225     private void initDecompressor(ChannelHandlerContext ctx, int streamId, Http2Headers headers, boolean endOfStream)
226             throws Http2Exception {
227         final Http2Stream stream = connection.stream(streamId);
228         if (stream == null) {
229             return;
230         }
231 
232         Http2Decompressor decompressor = decompressor(stream);
233         if (decompressor == null && !endOfStream) {
234             // Determine the content encoding.
235             CharSequence contentEncoding = headers.get(CONTENT_ENCODING);
236             if (contentEncoding == null) {
237                 contentEncoding = IDENTITY;
238             }
239             final EmbeddedChannel channel = newContentDecompressor(ctx, contentEncoding);
240             if (channel != null) {
241                 decompressor = new Http2Decompressor(channel);
242                 stream.setProperty(propertyKey, decompressor);
243                 // Decode the content and remove or replace the existing headers
244                 // so that the message looks like a decoded message.
245                 CharSequence targetContentEncoding = getTargetContentEncoding(contentEncoding);
246                 if (IDENTITY.contentEqualsIgnoreCase(targetContentEncoding)) {
247                     headers.remove(CONTENT_ENCODING);
248                 } else {
249                     headers.set(CONTENT_ENCODING, targetContentEncoding);
250                 }
251             }
252         }
253 
254         if (decompressor != null) {
255             // The content length will be for the compressed data. Since we will decompress the data
256             // this content-length will not be correct. Instead of queuing messages or delaying sending
257             // header frames just remove the content-length header.
258             headers.remove(CONTENT_LENGTH);
259 
260             // The first time that we initialize a decompressor, decorate the local flow controller to
261             // properly convert consumed bytes.
262             if (!flowControllerInitialized) {
263                 flowControllerInitialized = true;
264                 connection.local().flowController(new ConsumedBytesConverter(connection.local().flowController()));
265             }
266         }
267     }
268 
269     Http2Decompressor decompressor(Http2Stream stream) {
270         return stream == null ? null : (Http2Decompressor) stream.getProperty(propertyKey);
271     }
272 
273     /**
274      * Release remaining content from the {@link EmbeddedChannel}.
275      *
276      * @param decompressor The decompressor for {@code stream}
277      */
278     private static void cleanup(Http2Decompressor decompressor) {
279         decompressor.decompressor().finishAndReleaseAll();
280     }
281 
282     /**
283      * Read the next decompressed {@link ByteBuf} from the {@link EmbeddedChannel}
284      * or {@code null} if one does not exist.
285      *
286      * @param decompressor The channel to read from
287      * @return The next decoded {@link ByteBuf} from the {@link EmbeddedChannel} or {@code null} if one does not exist
288      */
289     private static ByteBuf nextReadableBuf(EmbeddedChannel decompressor) {
290         for (;;) {
291             final ByteBuf buf = decompressor.readInbound();
292             if (buf == null) {
293                 return null;
294             }
295             if (!buf.isReadable()) {
296                 buf.release();
297                 continue;
298             }
299             return buf;
300         }
301     }
302 
303     /**
304      * A decorator around the local flow controller that converts consumed bytes from uncompressed to compressed.
305      */
306     private final class ConsumedBytesConverter implements Http2LocalFlowController {
307         private final Http2LocalFlowController flowController;
308 
309         ConsumedBytesConverter(Http2LocalFlowController flowController) {
310             this.flowController = checkNotNull(flowController, "flowController");
311         }
312 
313         @Override
314         public Http2LocalFlowController frameWriter(Http2FrameWriter frameWriter) {
315             return flowController.frameWriter(frameWriter);
316         }
317 
318         @Override
319         public void channelHandlerContext(ChannelHandlerContext ctx) throws Http2Exception {
320             flowController.channelHandlerContext(ctx);
321         }
322 
323         @Override
324         public void initialWindowSize(int newWindowSize) throws Http2Exception {
325             flowController.initialWindowSize(newWindowSize);
326         }
327 
328         @Override
329         public int initialWindowSize() {
330             return flowController.initialWindowSize();
331         }
332 
333         @Override
334         public int windowSize(Http2Stream stream) {
335             return flowController.windowSize(stream);
336         }
337 
338         @Override
339         public void incrementWindowSize(Http2Stream stream, int delta) throws Http2Exception {
340             flowController.incrementWindowSize(stream, delta);
341         }
342 
343         @Override
344         public void receiveFlowControlledFrame(Http2Stream stream, ByteBuf data, int padding,
345                 boolean endOfStream) throws Http2Exception {
346             flowController.receiveFlowControlledFrame(stream, data, padding, endOfStream);
347         }
348 
349         @Override
350         public boolean consumeBytes(Http2Stream stream, int numBytes) throws Http2Exception {
351             Http2Decompressor decompressor = decompressor(stream);
352             if (decompressor != null) {
353                 // Convert the decompressed bytes to compressed (on the wire) bytes.
354                 numBytes = decompressor.consumeBytes(stream.id(), numBytes);
355             }
356             try {
357                 return flowController.consumeBytes(stream, numBytes);
358             } catch (Http2Exception e) {
359                 throw e;
360             } catch (Throwable t) {
361                 // The stream should be closed at this point. We have already changed our state tracking the compressed
362                 // bytes, and there is no guarantee we can recover if the underlying flow controller throws.
363                 throw streamError(stream.id(), INTERNAL_ERROR, t, "Error while returning bytes to flow control window");
364             }
365         }
366 
367         @Override
368         public int unconsumedBytes(Http2Stream stream) {
369             return flowController.unconsumedBytes(stream);
370         }
371 
372         @Override
373         public int initialWindowSize(Http2Stream stream) {
374             return flowController.initialWindowSize(stream);
375         }
376     }
377 
378     /**
379      * Provides the state for stream {@code DATA} frame decompression.
380      */
381     private static final class Http2Decompressor {
382         private final EmbeddedChannel decompressor;
383         private int compressed;
384         private int decompressed;
385 
386         Http2Decompressor(EmbeddedChannel decompressor) {
387             this.decompressor = decompressor;
388         }
389 
390         /**
391          * Responsible for taking compressed bytes in and producing decompressed bytes.
392          */
393         EmbeddedChannel decompressor() {
394             return decompressor;
395         }
396 
397         /**
398          * Increment the number of bytes received prior to doing any decompression.
399          */
400         void incrementCompressedBytes(int delta) {
401             assert delta >= 0;
402             compressed += delta;
403         }
404 
405         /**
406          * Increment the number of bytes after the decompression process.
407          */
408         void incrementDecompressedBytes(int delta) {
409             assert delta >= 0;
410             decompressed += delta;
411         }
412 
413         /**
414          * Determines the ratio between {@code numBytes} and {@link Http2Decompressor#decompressed}.
415          * This ratio is used to decrement {@link Http2Decompressor#decompressed} and
416          * {@link Http2Decompressor#compressed}.
417          * @param streamId the stream ID
418          * @param decompressedBytes The number of post-decompressed bytes to return to flow control
419          * @return The number of pre-decompressed bytes that have been consumed.
420          */
421         int consumeBytes(int streamId, int decompressedBytes) throws Http2Exception {
422             checkPositiveOrZero(decompressedBytes, "decompressedBytes");
423             if (decompressed - decompressedBytes < 0) {
424                 throw streamError(streamId, INTERNAL_ERROR,
425                         "Attempting to return too many bytes for stream %d. decompressed: %d " +
426                                 "decompressedBytes: %d", streamId, decompressed, decompressedBytes);
427             }
428             double consumedRatio = decompressedBytes / (double) decompressed;
429             int consumedCompressed = Math.min(compressed, (int) Math.ceil(compressed * consumedRatio));
430             if (compressed - consumedCompressed < 0) {
431                 throw streamError(streamId, INTERNAL_ERROR,
432                         "overflow when converting decompressed bytes to compressed bytes for stream %d." +
433                                 "decompressedBytes: %d decompressed: %d compressed: %d consumedCompressed: %d",
434                         streamId, decompressedBytes, decompressed, compressed, consumedCompressed);
435             }
436             decompressed -= decompressedBytes;
437             compressed -= consumedCompressed;
438 
439             return consumedCompressed;
440         }
441     }
442 }