View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License, version 2.0 (the
5    * "License"); you may not use this file except in compliance with the License. You may obtain a
6    * copy of the License at:
7    *
8    * https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software distributed under the License
11   * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12   * or implied. See the License for the specific language governing permissions and limitations under
13   * the License.
14   */
15  package io.netty.handler.codec.http2;
16  
17  import io.netty.buffer.ByteBuf;
18  import io.netty.buffer.Unpooled;
19  import io.netty.channel.ChannelFuture;
20  import io.netty.channel.ChannelHandlerContext;
21  import io.netty.channel.ChannelPromise;
22  import io.netty.channel.embedded.EmbeddedChannel;
23  import io.netty.handler.codec.ByteToMessageDecoder;
24  import io.netty.handler.codec.compression.BrotliEncoder;
25  import io.netty.handler.codec.compression.ZlibCodecFactory;
26  import io.netty.handler.codec.compression.ZlibWrapper;
27  import io.netty.handler.codec.compression.Brotli;
28  import io.netty.handler.codec.compression.BrotliOptions;
29  import io.netty.handler.codec.compression.CompressionOptions;
30  import io.netty.handler.codec.compression.DeflateOptions;
31  import io.netty.handler.codec.compression.GzipOptions;
32  import io.netty.handler.codec.compression.StandardCompressionOptions;
33  import io.netty.handler.codec.compression.ZstdEncoder;
34  import io.netty.handler.codec.compression.ZstdOptions;
35  import io.netty.util.concurrent.PromiseCombiner;
36  import io.netty.util.internal.ObjectUtil;
37  import io.netty.util.internal.UnstableApi;
38  
39  import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_ENCODING;
40  import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_LENGTH;
41  import static io.netty.handler.codec.http.HttpHeaderValues.BR;
42  import static io.netty.handler.codec.http.HttpHeaderValues.DEFLATE;
43  import static io.netty.handler.codec.http.HttpHeaderValues.GZIP;
44  import static io.netty.handler.codec.http.HttpHeaderValues.IDENTITY;
45  import static io.netty.handler.codec.http.HttpHeaderValues.X_DEFLATE;
46  import static io.netty.handler.codec.http.HttpHeaderValues.X_GZIP;
47  import static io.netty.handler.codec.http.HttpHeaderValues.ZSTD;
48  
49  /**
50   * A decorating HTTP2 encoder that will compress data frames according to the {@code content-encoding} header for each
51   * stream. The compression provided by this class will be applied to the data for the entire stream.
52   */
53  @UnstableApi
54  public class CompressorHttp2ConnectionEncoder extends DecoratingHttp2ConnectionEncoder {
55      // We cannot remove this because it'll be breaking change
56      public static final int DEFAULT_COMPRESSION_LEVEL = 6;
57      public static final int DEFAULT_WINDOW_BITS = 15;
58      public static final int DEFAULT_MEM_LEVEL = 8;
59  
60      private int compressionLevel;
61      private int windowBits;
62      private int memLevel;
63      private final Http2Connection.PropertyKey propertyKey;
64  
65      private final boolean supportsCompressionOptions;
66  
67      private BrotliOptions brotliOptions;
68      private GzipOptions gzipCompressionOptions;
69      private DeflateOptions deflateOptions;
70      private ZstdOptions zstdOptions;
71  
72      /**
73       * Create a new {@link CompressorHttp2ConnectionEncoder} instance
74       * with default implementation of {@link StandardCompressionOptions}
75       */
76      public CompressorHttp2ConnectionEncoder(Http2ConnectionEncoder delegate) {
77          this(delegate, defaultCompressionOptions());
78      }
79  
80      private static CompressionOptions[] defaultCompressionOptions() {
81          if (Brotli.isAvailable()) {
82              return new CompressionOptions[] {
83                      StandardCompressionOptions.brotli(),
84                      StandardCompressionOptions.gzip(),
85                      StandardCompressionOptions.deflate() };
86          }
87          return new CompressionOptions[] { StandardCompressionOptions.gzip(), StandardCompressionOptions.deflate() };
88      }
89  
90      /**
91       * Create a new {@link CompressorHttp2ConnectionEncoder} instance
92       */
93      @Deprecated
94      public CompressorHttp2ConnectionEncoder(Http2ConnectionEncoder delegate, int compressionLevel, int windowBits,
95                                              int memLevel) {
96          super(delegate);
97          this.compressionLevel = ObjectUtil.checkInRange(compressionLevel, 0, 9, "compressionLevel");
98          this.windowBits = ObjectUtil.checkInRange(windowBits, 9, 15, "windowBits");
99          this.memLevel = ObjectUtil.checkInRange(memLevel, 1, 9, "memLevel");
100 
101         propertyKey = connection().newKey();
102         connection().addListener(new Http2ConnectionAdapter() {
103             @Override
104             public void onStreamRemoved(Http2Stream stream) {
105                 final EmbeddedChannel compressor = stream.getProperty(propertyKey);
106                 if (compressor != null) {
107                     cleanup(stream, compressor);
108                 }
109             }
110         });
111 
112         supportsCompressionOptions = false;
113     }
114 
115     /**
116      * Create a new {@link CompressorHttp2ConnectionEncoder} with
117      * specified {@link StandardCompressionOptions}
118      */
119     public CompressorHttp2ConnectionEncoder(Http2ConnectionEncoder delegate,
120                                             CompressionOptions... compressionOptionsArgs) {
121         super(delegate);
122         ObjectUtil.checkNotNull(compressionOptionsArgs, "CompressionOptions");
123         ObjectUtil.deepCheckNotNull("CompressionOptions", compressionOptionsArgs);
124 
125         for (CompressionOptions compressionOptions : compressionOptionsArgs) {
126             // BrotliOptions' class initialization depends on Brotli classes being on the classpath.
127             // The Brotli.isAvailable check ensures that BrotliOptions will only get instantiated if Brotli is on
128             // the classpath.
129             // This results in the static analysis of native-image identifying the instanceof BrotliOptions check
130             // and thus BrotliOptions itself as unreachable, enabling native-image to link all classes at build time
131             // and not complain about the missing Brotli classes.
132             if (Brotli.isAvailable() && compressionOptions instanceof BrotliOptions) {
133                 brotliOptions = (BrotliOptions) compressionOptions;
134             } else if (compressionOptions instanceof GzipOptions) {
135                 gzipCompressionOptions = (GzipOptions) compressionOptions;
136             } else if (compressionOptions instanceof DeflateOptions) {
137                 deflateOptions = (DeflateOptions) compressionOptions;
138             } else if (compressionOptions instanceof ZstdOptions) {
139                 zstdOptions = (ZstdOptions) compressionOptions;
140             } else {
141                 throw new IllegalArgumentException("Unsupported " + CompressionOptions.class.getSimpleName() +
142                         ": " + compressionOptions);
143             }
144         }
145 
146         supportsCompressionOptions = true;
147 
148         propertyKey = connection().newKey();
149         connection().addListener(new Http2ConnectionAdapter() {
150             @Override
151             public void onStreamRemoved(Http2Stream stream) {
152                 final EmbeddedChannel compressor = stream.getProperty(propertyKey);
153                 if (compressor != null) {
154                     cleanup(stream, compressor);
155                 }
156             }
157         });
158     }
159 
160     @Override
161     public ChannelFuture writeData(final ChannelHandlerContext ctx, final int streamId, ByteBuf data, int padding,
162             final boolean endOfStream, ChannelPromise promise) {
163         final Http2Stream stream = connection().stream(streamId);
164         final EmbeddedChannel channel = stream == null ? null : (EmbeddedChannel) stream.getProperty(propertyKey);
165         if (channel == null) {
166             // The compressor may be null if no compatible encoding type was found in this stream's headers
167             return super.writeData(ctx, streamId, data, padding, endOfStream, promise);
168         }
169 
170         try {
171             // The channel will release the buffer after being written
172             channel.writeOutbound(data);
173             ByteBuf buf = nextReadableBuf(channel);
174             if (buf == null) {
175                 if (endOfStream) {
176                     if (channel.finish()) {
177                         buf = nextReadableBuf(channel);
178                     }
179                     return super.writeData(ctx, streamId, buf == null ? Unpooled.EMPTY_BUFFER : buf, padding,
180                             true, promise);
181                 }
182                 // END_STREAM is not set and the assumption is data is still forthcoming.
183                 promise.setSuccess();
184                 return promise;
185             }
186 
187             PromiseCombiner combiner = new PromiseCombiner(ctx.executor());
188             for (;;) {
189                 ByteBuf nextBuf = nextReadableBuf(channel);
190                 boolean compressedEndOfStream = nextBuf == null && endOfStream;
191                 if (compressedEndOfStream && channel.finish()) {
192                     nextBuf = nextReadableBuf(channel);
193                     compressedEndOfStream = nextBuf == null;
194                 }
195 
196                 ChannelPromise bufPromise = ctx.newPromise();
197                 combiner.add(bufPromise);
198                 super.writeData(ctx, streamId, buf, padding, compressedEndOfStream, bufPromise);
199                 if (nextBuf == null) {
200                     break;
201                 }
202 
203                 padding = 0; // Padding is only communicated once on the first iteration
204                 buf = nextBuf;
205             }
206             combiner.finish(promise);
207         } catch (Throwable cause) {
208             promise.tryFailure(cause);
209         } finally {
210             if (endOfStream) {
211                 cleanup(stream, channel);
212             }
213         }
214         return promise;
215     }
216 
217     @Override
218     public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding,
219             boolean endStream, ChannelPromise promise) {
220         try {
221             // Determine if compression is required and sanitize the headers.
222             EmbeddedChannel compressor = newCompressor(ctx, headers, endStream);
223 
224             // Write the headers and create the stream object.
225             ChannelFuture future = super.writeHeaders(ctx, streamId, headers, padding, endStream, promise);
226 
227             // After the stream object has been created, then attach the compressor as a property for data compression.
228             bindCompressorToStream(compressor, streamId);
229 
230             return future;
231         } catch (Throwable e) {
232             promise.tryFailure(e);
233         }
234         return promise;
235     }
236 
237     @Override
238     public ChannelFuture writeHeaders(final ChannelHandlerContext ctx, final int streamId, final Http2Headers headers,
239             final int streamDependency, final short weight, final boolean exclusive, final int padding,
240             final boolean endOfStream, final ChannelPromise promise) {
241         try {
242             // Determine if compression is required and sanitize the headers.
243             EmbeddedChannel compressor = newCompressor(ctx, headers, endOfStream);
244 
245             // Write the headers and create the stream object.
246             ChannelFuture future = super.writeHeaders(ctx, streamId, headers, streamDependency, weight, exclusive,
247                                                       padding, endOfStream, promise);
248 
249             // After the stream object has been created, then attach the compressor as a property for data compression.
250             bindCompressorToStream(compressor, streamId);
251 
252             return future;
253         } catch (Throwable e) {
254             promise.tryFailure(e);
255         }
256         return promise;
257     }
258 
259     /**
260      * Returns a new {@link EmbeddedChannel} that encodes the HTTP2 message content encoded in the specified
261      * {@code contentEncoding}.
262      *
263      * @param ctx the context.
264      * @param contentEncoding the value of the {@code content-encoding} header
265      * @return a new {@link ByteToMessageDecoder} if the specified encoding is supported. {@code null} otherwise
266      * (alternatively, you can throw a {@link Http2Exception} to block unknown encoding).
267      * @throws Http2Exception If the specified encoding is not supported and warrants an exception
268      */
269     protected EmbeddedChannel newContentCompressor(ChannelHandlerContext ctx, CharSequence contentEncoding)
270             throws Http2Exception {
271         if (GZIP.contentEqualsIgnoreCase(contentEncoding) || X_GZIP.contentEqualsIgnoreCase(contentEncoding)) {
272             return newCompressionChannel(ctx, ZlibWrapper.GZIP);
273         }
274         if (DEFLATE.contentEqualsIgnoreCase(contentEncoding) || X_DEFLATE.contentEqualsIgnoreCase(contentEncoding)) {
275             return newCompressionChannel(ctx, ZlibWrapper.ZLIB);
276         }
277         if (Brotli.isAvailable() && brotliOptions != null && BR.contentEqualsIgnoreCase(contentEncoding)) {
278             return new EmbeddedChannel(ctx.channel().id(), ctx.channel().metadata().hasDisconnect(),
279                     ctx.channel().config(), new BrotliEncoder(brotliOptions.parameters()));
280         }
281         if (zstdOptions != null && ZSTD.contentEqualsIgnoreCase(contentEncoding)) {
282             return new EmbeddedChannel(ctx.channel().id(), ctx.channel().metadata().hasDisconnect(),
283                     ctx.channel().config(), new ZstdEncoder(zstdOptions.compressionLevel(),
284                     zstdOptions.blockSize(), zstdOptions.maxEncodeSize()));
285         }
286         // 'identity' or unsupported
287         return null;
288     }
289 
290     /**
291      * Returns the expected content encoding of the decoded content. Returning {@code contentEncoding} is the default
292      * behavior, which is the case for most compressors.
293      *
294      * @param contentEncoding the value of the {@code content-encoding} header
295      * @return the expected content encoding of the new content.
296      * @throws Http2Exception if the {@code contentEncoding} is not supported and warrants an exception
297      */
298     protected CharSequence getTargetContentEncoding(CharSequence contentEncoding) throws Http2Exception {
299         return contentEncoding;
300     }
301 
302     /**
303      * Generate a new instance of an {@link EmbeddedChannel} capable of compressing data
304      * @param ctx the context.
305      * @param wrapper Defines what type of encoder should be used
306      */
307     private EmbeddedChannel newCompressionChannel(final ChannelHandlerContext ctx, ZlibWrapper wrapper) {
308         if (supportsCompressionOptions) {
309             if (wrapper == ZlibWrapper.GZIP && gzipCompressionOptions != null) {
310                 return new EmbeddedChannel(ctx.channel().id(), ctx.channel().metadata().hasDisconnect(),
311                         ctx.channel().config(), ZlibCodecFactory.newZlibEncoder(wrapper,
312                         gzipCompressionOptions.compressionLevel(), gzipCompressionOptions.windowBits(),
313                         gzipCompressionOptions.memLevel()));
314             } else if (wrapper == ZlibWrapper.ZLIB && deflateOptions != null) {
315                 return new EmbeddedChannel(ctx.channel().id(), ctx.channel().metadata().hasDisconnect(),
316                         ctx.channel().config(), ZlibCodecFactory.newZlibEncoder(wrapper,
317                         deflateOptions.compressionLevel(), deflateOptions.windowBits(),
318                         deflateOptions.memLevel()));
319             } else {
320                 throw new IllegalArgumentException("Unsupported ZlibWrapper: " + wrapper);
321             }
322         } else {
323             return new EmbeddedChannel(ctx.channel().id(), ctx.channel().metadata().hasDisconnect(),
324                     ctx.channel().config(), ZlibCodecFactory.newZlibEncoder(wrapper, compressionLevel, windowBits,
325                     memLevel));
326         }
327     }
328 
329     /**
330      * Checks if a new compressor object is needed for the stream identified by {@code streamId}. This method will
331      * modify the {@code content-encoding} header contained in {@code headers}.
332      *
333      * @param ctx the context.
334      * @param headers Object representing headers which are to be written
335      * @param endOfStream Indicates if the stream has ended
336      * @return The channel used to compress data.
337      * @throws Http2Exception if any problems occur during initialization.
338      */
339     private EmbeddedChannel newCompressor(ChannelHandlerContext ctx, Http2Headers headers, boolean endOfStream)
340             throws Http2Exception {
341         if (endOfStream) {
342             return null;
343         }
344 
345         CharSequence encoding = headers.get(CONTENT_ENCODING);
346         if (encoding == null) {
347             encoding = IDENTITY;
348         }
349         final EmbeddedChannel compressor = newContentCompressor(ctx, encoding);
350         if (compressor != null) {
351             CharSequence targetContentEncoding = getTargetContentEncoding(encoding);
352             if (IDENTITY.contentEqualsIgnoreCase(targetContentEncoding)) {
353                 headers.remove(CONTENT_ENCODING);
354             } else {
355                 headers.set(CONTENT_ENCODING, targetContentEncoding);
356             }
357 
358             // The content length will be for the decompressed data. Since we will compress the data
359             // this content-length will not be correct. Instead of queuing messages or delaying sending
360             // header frames...just remove the content-length header
361             headers.remove(CONTENT_LENGTH);
362         }
363 
364         return compressor;
365     }
366 
367     /**
368      * Called after the super class has written the headers and created any associated stream objects.
369      * @param compressor The compressor associated with the stream identified by {@code streamId}.
370      * @param streamId The stream id for which the headers were written.
371      */
372     private void bindCompressorToStream(EmbeddedChannel compressor, int streamId) {
373         if (compressor != null) {
374             Http2Stream stream = connection().stream(streamId);
375             if (stream != null) {
376                 stream.setProperty(propertyKey, compressor);
377             }
378         }
379     }
380 
381     /**
382      * Release remaining content from {@link EmbeddedChannel} and remove the compressor from the {@link Http2Stream}.
383      *
384      * @param stream The stream for which {@code compressor} is the compressor for
385      * @param compressor The compressor for {@code stream}
386      */
387     void cleanup(Http2Stream stream, EmbeddedChannel compressor) {
388         compressor.finishAndReleaseAll();
389         stream.removeProperty(propertyKey);
390     }
391 
392     /**
393      * Read the next compressed {@link ByteBuf} from the {@link EmbeddedChannel} or {@code null} if one does not exist.
394      *
395      * @param compressor The channel to read from
396      * @return The next decoded {@link ByteBuf} from the {@link EmbeddedChannel} or {@code null} if one does not exist
397      */
398     private static ByteBuf nextReadableBuf(EmbeddedChannel compressor) {
399         for (;;) {
400             final ByteBuf buf = compressor.readOutbound();
401             if (buf == null) {
402                 return null;
403             }
404             if (!buf.isReadable()) {
405                 buf.release();
406                 continue;
407             }
408             return buf;
409         }
410     }
411 }