View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License, version 2.0 (the
5    * "License"); you may not use this file except in compliance with the License. You may obtain a
6    * copy of the License at:
7    *
8    * https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software distributed under the License
11   * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12   * or implied. See the License for the specific language governing permissions and limitations under
13   * the License.
14   */
15  package io.netty.handler.codec.http2;
16  
17  import io.netty.buffer.ByteBuf;
18  import io.netty.buffer.Unpooled;
19  import io.netty.channel.ChannelFuture;
20  import io.netty.channel.ChannelHandlerContext;
21  import io.netty.channel.ChannelPromise;
22  import io.netty.channel.embedded.EmbeddedChannel;
23  import io.netty.handler.codec.ByteToMessageDecoder;
24  import io.netty.handler.codec.compression.BrotliEncoder;
25  import io.netty.handler.codec.compression.ZlibCodecFactory;
26  import io.netty.handler.codec.compression.ZlibWrapper;
27  import io.netty.handler.codec.compression.Brotli;
28  import io.netty.handler.codec.compression.BrotliOptions;
29  import io.netty.handler.codec.compression.CompressionOptions;
30  import io.netty.handler.codec.compression.DeflateOptions;
31  import io.netty.handler.codec.compression.GzipOptions;
32  import io.netty.handler.codec.compression.StandardCompressionOptions;
33  import io.netty.handler.codec.compression.ZstdEncoder;
34  import io.netty.handler.codec.compression.ZstdOptions;
35  import io.netty.handler.codec.compression.SnappyFrameEncoder;
36  import io.netty.handler.codec.compression.SnappyOptions;
37  import io.netty.util.concurrent.PromiseCombiner;
38  import io.netty.util.internal.ObjectUtil;
39  import io.netty.util.internal.UnstableApi;
40  
41  import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_ENCODING;
42  import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_LENGTH;
43  import static io.netty.handler.codec.http.HttpHeaderValues.BR;
44  import static io.netty.handler.codec.http.HttpHeaderValues.DEFLATE;
45  import static io.netty.handler.codec.http.HttpHeaderValues.GZIP;
46  import static io.netty.handler.codec.http.HttpHeaderValues.IDENTITY;
47  import static io.netty.handler.codec.http.HttpHeaderValues.X_DEFLATE;
48  import static io.netty.handler.codec.http.HttpHeaderValues.X_GZIP;
49  import static io.netty.handler.codec.http.HttpHeaderValues.ZSTD;
50  import static io.netty.handler.codec.http.HttpHeaderValues.SNAPPY;
51  
52  /**
53   * A decorating HTTP2 encoder that will compress data frames according to the {@code content-encoding} header for each
54   * stream. The compression provided by this class will be applied to the data for the entire stream.
55   */
56  @UnstableApi
57  public class CompressorHttp2ConnectionEncoder extends DecoratingHttp2ConnectionEncoder {
58      // We cannot remove this because it'll be breaking change
59      public static final int DEFAULT_COMPRESSION_LEVEL = 6;
60      public static final int DEFAULT_WINDOW_BITS = 15;
61      public static final int DEFAULT_MEM_LEVEL = 8;
62  
63      private int compressionLevel;
64      private int windowBits;
65      private int memLevel;
66      private final Http2Connection.PropertyKey propertyKey;
67  
68      private final boolean supportsCompressionOptions;
69  
70      private BrotliOptions brotliOptions;
71      private GzipOptions gzipCompressionOptions;
72      private DeflateOptions deflateOptions;
73      private ZstdOptions zstdOptions;
74      private SnappyOptions snappyOptions;
75  
76      /**
77       * Create a new {@link CompressorHttp2ConnectionEncoder} instance
78       * with default implementation of {@link StandardCompressionOptions}
79       */
80      public CompressorHttp2ConnectionEncoder(Http2ConnectionEncoder delegate) {
81          this(delegate, defaultCompressionOptions());
82      }
83  
84      private static CompressionOptions[] defaultCompressionOptions() {
85          if (Brotli.isAvailable()) {
86              return new CompressionOptions[] {
87                      StandardCompressionOptions.brotli(),
88                      StandardCompressionOptions.snappy(),
89                      StandardCompressionOptions.gzip(),
90                      StandardCompressionOptions.deflate() };
91          }
92          return new CompressionOptions[] { StandardCompressionOptions.snappy(),
93                  StandardCompressionOptions.gzip(), StandardCompressionOptions.deflate() };
94      }
95  
96      /**
97       * Create a new {@link CompressorHttp2ConnectionEncoder} instance
98       */
99      @Deprecated
100     public CompressorHttp2ConnectionEncoder(Http2ConnectionEncoder delegate, int compressionLevel, int windowBits,
101                                             int memLevel) {
102         super(delegate);
103         this.compressionLevel = ObjectUtil.checkInRange(compressionLevel, 0, 9, "compressionLevel");
104         this.windowBits = ObjectUtil.checkInRange(windowBits, 9, 15, "windowBits");
105         this.memLevel = ObjectUtil.checkInRange(memLevel, 1, 9, "memLevel");
106 
107         propertyKey = connection().newKey();
108         connection().addListener(new Http2ConnectionAdapter() {
109             @Override
110             public void onStreamRemoved(Http2Stream stream) {
111                 final EmbeddedChannel compressor = stream.getProperty(propertyKey);
112                 if (compressor != null) {
113                     cleanup(stream, compressor);
114                 }
115             }
116         });
117 
118         supportsCompressionOptions = false;
119     }
120 
121     /**
122      * Create a new {@link CompressorHttp2ConnectionEncoder} with
123      * specified {@link StandardCompressionOptions}
124      */
125     public CompressorHttp2ConnectionEncoder(Http2ConnectionEncoder delegate,
126                                             CompressionOptions... compressionOptionsArgs) {
127         super(delegate);
128         ObjectUtil.checkNotNull(compressionOptionsArgs, "CompressionOptions");
129         ObjectUtil.deepCheckNotNull("CompressionOptions", compressionOptionsArgs);
130 
131         for (CompressionOptions compressionOptions : compressionOptionsArgs) {
132             // BrotliOptions' class initialization depends on Brotli classes being on the classpath.
133             // The Brotli.isAvailable check ensures that BrotliOptions will only get instantiated if Brotli is on
134             // the classpath.
135             // This results in the static analysis of native-image identifying the instanceof BrotliOptions check
136             // and thus BrotliOptions itself as unreachable, enabling native-image to link all classes at build time
137             // and not complain about the missing Brotli classes.
138             if (Brotli.isAvailable() && compressionOptions instanceof BrotliOptions) {
139                 brotliOptions = (BrotliOptions) compressionOptions;
140             } else if (compressionOptions instanceof GzipOptions) {
141                 gzipCompressionOptions = (GzipOptions) compressionOptions;
142             } else if (compressionOptions instanceof DeflateOptions) {
143                 deflateOptions = (DeflateOptions) compressionOptions;
144             } else if (compressionOptions instanceof ZstdOptions) {
145                 zstdOptions = (ZstdOptions) compressionOptions;
146             } else if (compressionOptions instanceof SnappyOptions) {
147                 snappyOptions = (SnappyOptions) compressionOptions;
148             } else {
149                 throw new IllegalArgumentException("Unsupported " + CompressionOptions.class.getSimpleName() +
150                         ": " + compressionOptions);
151             }
152         }
153 
154         supportsCompressionOptions = true;
155 
156         propertyKey = connection().newKey();
157         connection().addListener(new Http2ConnectionAdapter() {
158             @Override
159             public void onStreamRemoved(Http2Stream stream) {
160                 final EmbeddedChannel compressor = stream.getProperty(propertyKey);
161                 if (compressor != null) {
162                     cleanup(stream, compressor);
163                 }
164             }
165         });
166     }
167 
168     @Override
169     public ChannelFuture writeData(final ChannelHandlerContext ctx, final int streamId, ByteBuf data, int padding,
170             final boolean endOfStream, ChannelPromise promise) {
171         final Http2Stream stream = connection().stream(streamId);
172         final EmbeddedChannel channel = stream == null ? null : (EmbeddedChannel) stream.getProperty(propertyKey);
173         if (channel == null) {
174             // The compressor may be null if no compatible encoding type was found in this stream's headers
175             return super.writeData(ctx, streamId, data, padding, endOfStream, promise);
176         }
177 
178         try {
179             // The channel will release the buffer after being written
180             channel.writeOutbound(data);
181             ByteBuf buf = nextReadableBuf(channel);
182             if (buf == null) {
183                 if (endOfStream) {
184                     if (channel.finish()) {
185                         buf = nextReadableBuf(channel);
186                     }
187                     return super.writeData(ctx, streamId, buf == null ? Unpooled.EMPTY_BUFFER : buf, padding,
188                             true, promise);
189                 }
190                 // END_STREAM is not set and the assumption is data is still forthcoming.
191                 promise.setSuccess();
192                 return promise;
193             }
194 
195             PromiseCombiner combiner = new PromiseCombiner(ctx.executor());
196             for (;;) {
197                 ByteBuf nextBuf = nextReadableBuf(channel);
198                 boolean compressedEndOfStream = nextBuf == null && endOfStream;
199                 if (compressedEndOfStream && channel.finish()) {
200                     nextBuf = nextReadableBuf(channel);
201                     compressedEndOfStream = nextBuf == null;
202                 }
203 
204                 ChannelPromise bufPromise = ctx.newPromise();
205                 combiner.add(bufPromise);
206                 super.writeData(ctx, streamId, buf, padding, compressedEndOfStream, bufPromise);
207                 if (nextBuf == null) {
208                     break;
209                 }
210 
211                 padding = 0; // Padding is only communicated once on the first iteration
212                 buf = nextBuf;
213             }
214             combiner.finish(promise);
215         } catch (Throwable cause) {
216             promise.tryFailure(cause);
217         } finally {
218             if (endOfStream) {
219                 cleanup(stream, channel);
220             }
221         }
222         return promise;
223     }
224 
225     @Override
226     public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding,
227             boolean endStream, ChannelPromise promise) {
228         try {
229             // Determine if compression is required and sanitize the headers.
230             EmbeddedChannel compressor = newCompressor(ctx, headers, endStream);
231 
232             // Write the headers and create the stream object.
233             ChannelFuture future = super.writeHeaders(ctx, streamId, headers, padding, endStream, promise);
234 
235             // After the stream object has been created, then attach the compressor as a property for data compression.
236             bindCompressorToStream(compressor, streamId);
237 
238             return future;
239         } catch (Throwable e) {
240             promise.tryFailure(e);
241         }
242         return promise;
243     }
244 
245     @Override
246     public ChannelFuture writeHeaders(final ChannelHandlerContext ctx, final int streamId, final Http2Headers headers,
247             final int streamDependency, final short weight, final boolean exclusive, final int padding,
248             final boolean endOfStream, final ChannelPromise promise) {
249         try {
250             // Determine if compression is required and sanitize the headers.
251             EmbeddedChannel compressor = newCompressor(ctx, headers, endOfStream);
252 
253             // Write the headers and create the stream object.
254             ChannelFuture future = super.writeHeaders(ctx, streamId, headers, streamDependency, weight, exclusive,
255                                                       padding, endOfStream, promise);
256 
257             // After the stream object has been created, then attach the compressor as a property for data compression.
258             bindCompressorToStream(compressor, streamId);
259 
260             return future;
261         } catch (Throwable e) {
262             promise.tryFailure(e);
263         }
264         return promise;
265     }
266 
267     /**
268      * Returns a new {@link EmbeddedChannel} that encodes the HTTP2 message content encoded in the specified
269      * {@code contentEncoding}.
270      *
271      * @param ctx the context.
272      * @param contentEncoding the value of the {@code content-encoding} header
273      * @return a new {@link ByteToMessageDecoder} if the specified encoding is supported. {@code null} otherwise
274      * (alternatively, you can throw a {@link Http2Exception} to block unknown encoding).
275      * @throws Http2Exception If the specified encoding is not supported and warrants an exception
276      */
277     protected EmbeddedChannel newContentCompressor(ChannelHandlerContext ctx, CharSequence contentEncoding)
278             throws Http2Exception {
279         if (GZIP.contentEqualsIgnoreCase(contentEncoding) || X_GZIP.contentEqualsIgnoreCase(contentEncoding)) {
280             return newCompressionChannel(ctx, ZlibWrapper.GZIP);
281         }
282         if (DEFLATE.contentEqualsIgnoreCase(contentEncoding) || X_DEFLATE.contentEqualsIgnoreCase(contentEncoding)) {
283             return newCompressionChannel(ctx, ZlibWrapper.ZLIB);
284         }
285         if (Brotli.isAvailable() && brotliOptions != null && BR.contentEqualsIgnoreCase(contentEncoding)) {
286             return new EmbeddedChannel(ctx.channel().id(), ctx.channel().metadata().hasDisconnect(),
287                     ctx.channel().config(), new BrotliEncoder(brotliOptions.parameters()));
288         }
289         if (zstdOptions != null && ZSTD.contentEqualsIgnoreCase(contentEncoding)) {
290             return new EmbeddedChannel(ctx.channel().id(), ctx.channel().metadata().hasDisconnect(),
291                     ctx.channel().config(), new ZstdEncoder(zstdOptions.compressionLevel(),
292                     zstdOptions.blockSize(), zstdOptions.maxEncodeSize()));
293         }
294         if (snappyOptions != null && SNAPPY.contentEqualsIgnoreCase(contentEncoding)) {
295             return new EmbeddedChannel(ctx.channel().id(), ctx.channel().metadata().hasDisconnect(),
296                     ctx.channel().config(), new SnappyFrameEncoder());
297         }
298         // 'identity' or unsupported
299         return null;
300     }
301 
302     /**
303      * Returns the expected content encoding of the decoded content. Returning {@code contentEncoding} is the default
304      * behavior, which is the case for most compressors.
305      *
306      * @param contentEncoding the value of the {@code content-encoding} header
307      * @return the expected content encoding of the new content.
308      * @throws Http2Exception if the {@code contentEncoding} is not supported and warrants an exception
309      */
310     protected CharSequence getTargetContentEncoding(CharSequence contentEncoding) throws Http2Exception {
311         return contentEncoding;
312     }
313 
314     /**
315      * Generate a new instance of an {@link EmbeddedChannel} capable of compressing data
316      * @param ctx the context.
317      * @param wrapper Defines what type of encoder should be used
318      */
319     private EmbeddedChannel newCompressionChannel(final ChannelHandlerContext ctx, ZlibWrapper wrapper) {
320         if (supportsCompressionOptions) {
321             if (wrapper == ZlibWrapper.GZIP && gzipCompressionOptions != null) {
322                 return new EmbeddedChannel(ctx.channel().id(), ctx.channel().metadata().hasDisconnect(),
323                         ctx.channel().config(), ZlibCodecFactory.newZlibEncoder(wrapper,
324                         gzipCompressionOptions.compressionLevel(), gzipCompressionOptions.windowBits(),
325                         gzipCompressionOptions.memLevel()));
326             } else if (wrapper == ZlibWrapper.ZLIB && deflateOptions != null) {
327                 return new EmbeddedChannel(ctx.channel().id(), ctx.channel().metadata().hasDisconnect(),
328                         ctx.channel().config(), ZlibCodecFactory.newZlibEncoder(wrapper,
329                         deflateOptions.compressionLevel(), deflateOptions.windowBits(),
330                         deflateOptions.memLevel()));
331             } else {
332                 throw new IllegalArgumentException("Unsupported ZlibWrapper: " + wrapper);
333             }
334         } else {
335             return new EmbeddedChannel(ctx.channel().id(), ctx.channel().metadata().hasDisconnect(),
336                     ctx.channel().config(), ZlibCodecFactory.newZlibEncoder(wrapper, compressionLevel, windowBits,
337                     memLevel));
338         }
339     }
340 
341     /**
342      * Checks if a new compressor object is needed for the stream identified by {@code streamId}. This method will
343      * modify the {@code content-encoding} header contained in {@code headers}.
344      *
345      * @param ctx the context.
346      * @param headers Object representing headers which are to be written
347      * @param endOfStream Indicates if the stream has ended
348      * @return The channel used to compress data.
349      * @throws Http2Exception if any problems occur during initialization.
350      */
351     private EmbeddedChannel newCompressor(ChannelHandlerContext ctx, Http2Headers headers, boolean endOfStream)
352             throws Http2Exception {
353         if (endOfStream) {
354             return null;
355         }
356 
357         CharSequence encoding = headers.get(CONTENT_ENCODING);
358         if (encoding == null) {
359             encoding = IDENTITY;
360         }
361         final EmbeddedChannel compressor = newContentCompressor(ctx, encoding);
362         if (compressor != null) {
363             CharSequence targetContentEncoding = getTargetContentEncoding(encoding);
364             if (IDENTITY.contentEqualsIgnoreCase(targetContentEncoding)) {
365                 headers.remove(CONTENT_ENCODING);
366             } else {
367                 headers.set(CONTENT_ENCODING, targetContentEncoding);
368             }
369 
370             // The content length will be for the decompressed data. Since we will compress the data
371             // this content-length will not be correct. Instead of queuing messages or delaying sending
372             // header frames...just remove the content-length header
373             headers.remove(CONTENT_LENGTH);
374         }
375 
376         return compressor;
377     }
378 
379     /**
380      * Called after the super class has written the headers and created any associated stream objects.
381      * @param compressor The compressor associated with the stream identified by {@code streamId}.
382      * @param streamId The stream id for which the headers were written.
383      */
384     private void bindCompressorToStream(EmbeddedChannel compressor, int streamId) {
385         if (compressor != null) {
386             Http2Stream stream = connection().stream(streamId);
387             if (stream != null) {
388                 stream.setProperty(propertyKey, compressor);
389             }
390         }
391     }
392 
393     /**
394      * Release remaining content from {@link EmbeddedChannel} and remove the compressor from the {@link Http2Stream}.
395      *
396      * @param stream The stream for which {@code compressor} is the compressor for
397      * @param compressor The compressor for {@code stream}
398      */
399     void cleanup(Http2Stream stream, EmbeddedChannel compressor) {
400         compressor.finishAndReleaseAll();
401         stream.removeProperty(propertyKey);
402     }
403 
404     /**
405      * Read the next compressed {@link ByteBuf} from the {@link EmbeddedChannel} or {@code null} if one does not exist.
406      *
407      * @param compressor The channel to read from
408      * @return The next decoded {@link ByteBuf} from the {@link EmbeddedChannel} or {@code null} if one does not exist
409      */
410     private static ByteBuf nextReadableBuf(EmbeddedChannel compressor) {
411         for (;;) {
412             final ByteBuf buf = compressor.readOutbound();
413             if (buf == null) {
414                 return null;
415             }
416             if (!buf.isReadable()) {
417                 buf.release();
418                 continue;
419             }
420             return buf;
421         }
422     }
423 }