View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License, version 2.0 (the
5    * "License"); you may not use this file except in compliance with the License. You may obtain a
6    * copy of the License at:
7    *
8    * https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software distributed under the License
11   * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12   * or implied. See the License for the specific language governing permissions and limitations under
13   * the License.
14   */
15  package io.netty.handler.codec.http2;
16  
17  import io.netty.buffer.ByteBuf;
18  import io.netty.buffer.Unpooled;
19  import io.netty.channel.ChannelFuture;
20  import io.netty.channel.ChannelHandlerContext;
21  import io.netty.channel.ChannelPromise;
22  import io.netty.channel.embedded.EmbeddedChannel;
23  import io.netty.handler.codec.ByteToMessageDecoder;
24  import io.netty.handler.codec.compression.BrotliEncoder;
25  import io.netty.handler.codec.compression.ZlibCodecFactory;
26  import io.netty.handler.codec.compression.ZlibWrapper;
27  import io.netty.handler.codec.compression.Brotli;
28  import io.netty.handler.codec.compression.BrotliOptions;
29  import io.netty.handler.codec.compression.CompressionOptions;
30  import io.netty.handler.codec.compression.DeflateOptions;
31  import io.netty.handler.codec.compression.GzipOptions;
32  import io.netty.handler.codec.compression.StandardCompressionOptions;
33  import io.netty.handler.codec.compression.Zstd;
34  import io.netty.handler.codec.compression.ZstdEncoder;
35  import io.netty.handler.codec.compression.ZstdOptions;
36  import io.netty.handler.codec.compression.SnappyFrameEncoder;
37  import io.netty.handler.codec.compression.SnappyOptions;
38  import io.netty.util.concurrent.PromiseCombiner;
39  import io.netty.util.internal.ObjectUtil;
40  
41  import java.util.ArrayList;
42  import java.util.List;
43  
44  import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_ENCODING;
45  import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_LENGTH;
46  import static io.netty.handler.codec.http.HttpHeaderValues.BR;
47  import static io.netty.handler.codec.http.HttpHeaderValues.DEFLATE;
48  import static io.netty.handler.codec.http.HttpHeaderValues.GZIP;
49  import static io.netty.handler.codec.http.HttpHeaderValues.IDENTITY;
50  import static io.netty.handler.codec.http.HttpHeaderValues.X_DEFLATE;
51  import static io.netty.handler.codec.http.HttpHeaderValues.X_GZIP;
52  import static io.netty.handler.codec.http.HttpHeaderValues.ZSTD;
53  import static io.netty.handler.codec.http.HttpHeaderValues.SNAPPY;
54  
55  /**
56   * A decorating HTTP2 encoder that will compress data frames according to the {@code content-encoding} header for each
57   * stream. The compression provided by this class will be applied to the data for the entire stream.
58   */
59  public class CompressorHttp2ConnectionEncoder extends DecoratingHttp2ConnectionEncoder {
60      // We cannot remove this because it'll be breaking change
61      public static final int DEFAULT_COMPRESSION_LEVEL = 6;
62      public static final int DEFAULT_WINDOW_BITS = 15;
63      public static final int DEFAULT_MEM_LEVEL = 8;
64  
65      private int compressionLevel;
66      private int windowBits;
67      private int memLevel;
68      private final Http2Connection.PropertyKey propertyKey;
69  
70      private final boolean supportsCompressionOptions;
71  
72      private BrotliOptions brotliOptions;
73      private GzipOptions gzipCompressionOptions;
74      private DeflateOptions deflateOptions;
75      private ZstdOptions zstdOptions;
76      private SnappyOptions snappyOptions;
77  
78      /**
79       * Create a new {@link CompressorHttp2ConnectionEncoder} instance
80       * with default implementation of {@link StandardCompressionOptions}
81       */
82      public CompressorHttp2ConnectionEncoder(Http2ConnectionEncoder delegate) {
83          this(delegate, defaultCompressionOptions());
84      }
85  
86      private static CompressionOptions[] defaultCompressionOptions() {
87          List<CompressionOptions> compressionOptions = new ArrayList<CompressionOptions>();
88          compressionOptions.add(StandardCompressionOptions.gzip());
89          compressionOptions.add(StandardCompressionOptions.deflate());
90          compressionOptions.add(StandardCompressionOptions.snappy());
91          if (Brotli.isAvailable()) {
92              compressionOptions.add(StandardCompressionOptions.brotli());
93          }
94          if (Zstd.isAvailable()) {
95              compressionOptions.add(StandardCompressionOptions.zstd());
96          }
97          return compressionOptions.toArray(new CompressionOptions[0]);
98      }
99  
100     /**
101      * Create a new {@link CompressorHttp2ConnectionEncoder} instance
102      */
103     @Deprecated
104     public CompressorHttp2ConnectionEncoder(Http2ConnectionEncoder delegate, int compressionLevel, int windowBits,
105                                             int memLevel) {
106         super(delegate);
107         this.compressionLevel = ObjectUtil.checkInRange(compressionLevel, 0, 9, "compressionLevel");
108         this.windowBits = ObjectUtil.checkInRange(windowBits, 9, 15, "windowBits");
109         this.memLevel = ObjectUtil.checkInRange(memLevel, 1, 9, "memLevel");
110 
111         propertyKey = connection().newKey();
112         connection().addListener(new Http2ConnectionAdapter() {
113             @Override
114             public void onStreamRemoved(Http2Stream stream) {
115                 final EmbeddedChannel compressor = stream.getProperty(propertyKey);
116                 if (compressor != null) {
117                     cleanup(stream, compressor);
118                 }
119             }
120         });
121 
122         supportsCompressionOptions = false;
123     }
124 
125     /**
126      * Create a new {@link CompressorHttp2ConnectionEncoder} with
127      * specified {@link StandardCompressionOptions}
128      */
129     public CompressorHttp2ConnectionEncoder(Http2ConnectionEncoder delegate,
130                                             CompressionOptions... compressionOptionsArgs) {
131         super(delegate);
132         ObjectUtil.checkNotNull(compressionOptionsArgs, "CompressionOptions");
133         ObjectUtil.deepCheckNotNull("CompressionOptions", compressionOptionsArgs);
134 
135         for (CompressionOptions compressionOptions : compressionOptionsArgs) {
136             // BrotliOptions' class initialization depends on Brotli classes being on the classpath.
137             // The Brotli.isAvailable check ensures that BrotliOptions will only get instantiated if Brotli is on
138             // the classpath.
139             // This results in the static analysis of native-image identifying the instanceof BrotliOptions check
140             // and thus BrotliOptions itself as unreachable, enabling native-image to link all classes at build time
141             // and not complain about the missing Brotli classes.
142             if (Brotli.isAvailable() && compressionOptions instanceof BrotliOptions) {
143                 brotliOptions = (BrotliOptions) compressionOptions;
144             } else if (compressionOptions instanceof GzipOptions) {
145                 gzipCompressionOptions = (GzipOptions) compressionOptions;
146             } else if (compressionOptions instanceof DeflateOptions) {
147                 deflateOptions = (DeflateOptions) compressionOptions;
148             } else if (compressionOptions instanceof ZstdOptions) {
149                 zstdOptions = (ZstdOptions) compressionOptions;
150             } else if (compressionOptions instanceof SnappyOptions) {
151                 snappyOptions = (SnappyOptions) compressionOptions;
152             } else {
153                 throw new IllegalArgumentException("Unsupported " + CompressionOptions.class.getSimpleName() +
154                         ": " + compressionOptions);
155             }
156         }
157 
158         supportsCompressionOptions = true;
159 
160         propertyKey = connection().newKey();
161         connection().addListener(new Http2ConnectionAdapter() {
162             @Override
163             public void onStreamRemoved(Http2Stream stream) {
164                 final EmbeddedChannel compressor = stream.getProperty(propertyKey);
165                 if (compressor != null) {
166                     cleanup(stream, compressor);
167                 }
168             }
169         });
170     }
171 
172     @Override
173     public ChannelFuture writeData(final ChannelHandlerContext ctx, final int streamId, ByteBuf data, int padding,
174             final boolean endOfStream, ChannelPromise promise) {
175         final Http2Stream stream = connection().stream(streamId);
176         final EmbeddedChannel channel = stream == null ? null : (EmbeddedChannel) stream.getProperty(propertyKey);
177         if (channel == null) {
178             // The compressor may be null if no compatible encoding type was found in this stream's headers
179             return super.writeData(ctx, streamId, data, padding, endOfStream, promise);
180         }
181 
182         try {
183             // The channel will release the buffer after being written
184             channel.writeOutbound(data);
185             ByteBuf buf = nextReadableBuf(channel);
186             if (buf == null) {
187                 if (endOfStream) {
188                     if (channel.finish()) {
189                         buf = nextReadableBuf(channel);
190                     }
191                     return super.writeData(ctx, streamId, buf == null ? Unpooled.EMPTY_BUFFER : buf, padding,
192                             true, promise);
193                 }
194                 // END_STREAM is not set and the assumption is data is still forthcoming.
195                 promise.setSuccess();
196                 return promise;
197             }
198 
199             PromiseCombiner combiner = new PromiseCombiner(ctx.executor());
200             for (;;) {
201                 ByteBuf nextBuf = nextReadableBuf(channel);
202                 boolean compressedEndOfStream = nextBuf == null && endOfStream;
203                 if (compressedEndOfStream && channel.finish()) {
204                     nextBuf = nextReadableBuf(channel);
205                     compressedEndOfStream = nextBuf == null;
206                 }
207 
208                 ChannelPromise bufPromise = ctx.newPromise();
209                 combiner.add(bufPromise);
210                 super.writeData(ctx, streamId, buf, padding, compressedEndOfStream, bufPromise);
211                 if (nextBuf == null) {
212                     break;
213                 }
214 
215                 padding = 0; // Padding is only communicated once on the first iteration
216                 buf = nextBuf;
217             }
218             combiner.finish(promise);
219         } catch (Throwable cause) {
220             promise.tryFailure(cause);
221         } finally {
222             if (endOfStream) {
223                 cleanup(stream, channel);
224             }
225         }
226         return promise;
227     }
228 
229     @Override
230     public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding,
231             boolean endStream, ChannelPromise promise) {
232         try {
233             // Determine if compression is required and sanitize the headers.
234             EmbeddedChannel compressor = newCompressor(ctx, headers, endStream);
235 
236             // Write the headers and create the stream object.
237             ChannelFuture future = super.writeHeaders(ctx, streamId, headers, padding, endStream, promise);
238 
239             // After the stream object has been created, then attach the compressor as a property for data compression.
240             bindCompressorToStream(compressor, streamId);
241 
242             return future;
243         } catch (Throwable e) {
244             promise.tryFailure(e);
245         }
246         return promise;
247     }
248 
249     @Override
250     public ChannelFuture writeHeaders(final ChannelHandlerContext ctx, final int streamId, final Http2Headers headers,
251             final int streamDependency, final short weight, final boolean exclusive, final int padding,
252             final boolean endOfStream, final ChannelPromise promise) {
253         try {
254             // Determine if compression is required and sanitize the headers.
255             EmbeddedChannel compressor = newCompressor(ctx, headers, endOfStream);
256 
257             // Write the headers and create the stream object.
258             ChannelFuture future = super.writeHeaders(ctx, streamId, headers, streamDependency, weight, exclusive,
259                                                       padding, endOfStream, promise);
260 
261             // After the stream object has been created, then attach the compressor as a property for data compression.
262             bindCompressorToStream(compressor, streamId);
263 
264             return future;
265         } catch (Throwable e) {
266             promise.tryFailure(e);
267         }
268         return promise;
269     }
270 
271     /**
272      * Returns a new {@link EmbeddedChannel} that encodes the HTTP2 message content encoded in the specified
273      * {@code contentEncoding}.
274      *
275      * @param ctx the context.
276      * @param contentEncoding the value of the {@code content-encoding} header
277      * @return a new {@link ByteToMessageDecoder} if the specified encoding is supported. {@code null} otherwise
278      * (alternatively, you can throw a {@link Http2Exception} to block unknown encoding).
279      * @throws Http2Exception If the specified encoding is not supported and warrants an exception
280      */
281     protected EmbeddedChannel newContentCompressor(ChannelHandlerContext ctx, CharSequence contentEncoding)
282             throws Http2Exception {
283         if (GZIP.contentEqualsIgnoreCase(contentEncoding) || X_GZIP.contentEqualsIgnoreCase(contentEncoding)) {
284             return newCompressionChannel(ctx, ZlibWrapper.GZIP);
285         }
286         if (DEFLATE.contentEqualsIgnoreCase(contentEncoding) || X_DEFLATE.contentEqualsIgnoreCase(contentEncoding)) {
287             return newCompressionChannel(ctx, ZlibWrapper.ZLIB);
288         }
289         if (Brotli.isAvailable() && brotliOptions != null && BR.contentEqualsIgnoreCase(contentEncoding)) {
290             return new EmbeddedChannel(ctx.channel().id(), ctx.channel().metadata().hasDisconnect(),
291                     ctx.channel().config(), new BrotliEncoder(brotliOptions.parameters()));
292         }
293         if (zstdOptions != null && ZSTD.contentEqualsIgnoreCase(contentEncoding)) {
294             return new EmbeddedChannel(ctx.channel().id(), ctx.channel().metadata().hasDisconnect(),
295                     ctx.channel().config(), new ZstdEncoder(zstdOptions.compressionLevel(),
296                     zstdOptions.blockSize(), zstdOptions.maxEncodeSize()));
297         }
298         if (snappyOptions != null && SNAPPY.contentEqualsIgnoreCase(contentEncoding)) {
299             return new EmbeddedChannel(ctx.channel().id(), ctx.channel().metadata().hasDisconnect(),
300                     ctx.channel().config(), new SnappyFrameEncoder());
301         }
302         // 'identity' or unsupported
303         return null;
304     }
305 
306     /**
307      * Returns the expected content encoding of the decoded content. Returning {@code contentEncoding} is the default
308      * behavior, which is the case for most compressors.
309      *
310      * @param contentEncoding the value of the {@code content-encoding} header
311      * @return the expected content encoding of the new content.
312      * @throws Http2Exception if the {@code contentEncoding} is not supported and warrants an exception
313      */
314     protected CharSequence getTargetContentEncoding(CharSequence contentEncoding) throws Http2Exception {
315         return contentEncoding;
316     }
317 
318     /**
319      * Generate a new instance of an {@link EmbeddedChannel} capable of compressing data
320      * @param ctx the context.
321      * @param wrapper Defines what type of encoder should be used
322      */
323     private EmbeddedChannel newCompressionChannel(final ChannelHandlerContext ctx, ZlibWrapper wrapper) {
324         if (supportsCompressionOptions) {
325             if (wrapper == ZlibWrapper.GZIP && gzipCompressionOptions != null) {
326                 return new EmbeddedChannel(ctx.channel().id(), ctx.channel().metadata().hasDisconnect(),
327                         ctx.channel().config(), ZlibCodecFactory.newZlibEncoder(wrapper,
328                         gzipCompressionOptions.compressionLevel(), gzipCompressionOptions.windowBits(),
329                         gzipCompressionOptions.memLevel()));
330             } else if (wrapper == ZlibWrapper.ZLIB && deflateOptions != null) {
331                 return new EmbeddedChannel(ctx.channel().id(), ctx.channel().metadata().hasDisconnect(),
332                         ctx.channel().config(), ZlibCodecFactory.newZlibEncoder(wrapper,
333                         deflateOptions.compressionLevel(), deflateOptions.windowBits(),
334                         deflateOptions.memLevel()));
335             } else {
336                 throw new IllegalArgumentException("Unsupported ZlibWrapper: " + wrapper);
337             }
338         } else {
339             return new EmbeddedChannel(ctx.channel().id(), ctx.channel().metadata().hasDisconnect(),
340                     ctx.channel().config(), ZlibCodecFactory.newZlibEncoder(wrapper, compressionLevel, windowBits,
341                     memLevel));
342         }
343     }
344 
345     /**
346      * Checks if a new compressor object is needed for the stream identified by {@code streamId}. This method will
347      * modify the {@code content-encoding} header contained in {@code headers}.
348      *
349      * @param ctx the context.
350      * @param headers Object representing headers which are to be written
351      * @param endOfStream Indicates if the stream has ended
352      * @return The channel used to compress data.
353      * @throws Http2Exception if any problems occur during initialization.
354      */
355     private EmbeddedChannel newCompressor(ChannelHandlerContext ctx, Http2Headers headers, boolean endOfStream)
356             throws Http2Exception {
357         if (endOfStream) {
358             return null;
359         }
360 
361         CharSequence encoding = headers.get(CONTENT_ENCODING);
362         if (encoding == null) {
363             encoding = IDENTITY;
364         }
365         final EmbeddedChannel compressor = newContentCompressor(ctx, encoding);
366         if (compressor != null) {
367             CharSequence targetContentEncoding = getTargetContentEncoding(encoding);
368             if (IDENTITY.contentEqualsIgnoreCase(targetContentEncoding)) {
369                 headers.remove(CONTENT_ENCODING);
370             } else {
371                 headers.set(CONTENT_ENCODING, targetContentEncoding);
372             }
373 
374             // The content length will be for the decompressed data. Since we will compress the data
375             // this content-length will not be correct. Instead of queuing messages or delaying sending
376             // header frames...just remove the content-length header
377             headers.remove(CONTENT_LENGTH);
378         }
379 
380         return compressor;
381     }
382 
383     /**
384      * Called after the super class has written the headers and created any associated stream objects.
385      * @param compressor The compressor associated with the stream identified by {@code streamId}.
386      * @param streamId The stream id for which the headers were written.
387      */
388     private void bindCompressorToStream(EmbeddedChannel compressor, int streamId) {
389         if (compressor != null) {
390             Http2Stream stream = connection().stream(streamId);
391             if (stream != null) {
392                 stream.setProperty(propertyKey, compressor);
393             }
394         }
395     }
396 
397     /**
398      * Release remaining content from {@link EmbeddedChannel} and remove the compressor from the {@link Http2Stream}.
399      *
400      * @param stream The stream for which {@code compressor} is the compressor for
401      * @param compressor The compressor for {@code stream}
402      */
403     void cleanup(Http2Stream stream, EmbeddedChannel compressor) {
404         compressor.finishAndReleaseAll();
405         stream.removeProperty(propertyKey);
406     }
407 
408     /**
409      * Read the next compressed {@link ByteBuf} from the {@link EmbeddedChannel} or {@code null} if one does not exist.
410      *
411      * @param compressor The channel to read from
412      * @return The next decoded {@link ByteBuf} from the {@link EmbeddedChannel} or {@code null} if one does not exist
413      */
414     private static ByteBuf nextReadableBuf(EmbeddedChannel compressor) {
415         for (;;) {
416             final ByteBuf buf = compressor.readOutbound();
417             if (buf == null) {
418                 return null;
419             }
420             if (!buf.isReadable()) {
421                 buf.release();
422                 continue;
423             }
424             return buf;
425         }
426     }
427 }