View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License, version 2.0 (the
5    * "License"); you may not use this file except in compliance with the License. You may obtain a
6    * copy of the License at:
7    *
8    * https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software distributed under the License
11   * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12   * or implied. See the License for the specific language governing permissions and limitations under
13   * the License.
14   */
15  package io.netty.handler.codec.http2;
16  
17  import io.netty.buffer.ByteBuf;
18  import io.netty.buffer.Unpooled;
19  import io.netty.channel.Channel;
20  import io.netty.channel.ChannelConfig;
21  import io.netty.channel.ChannelFuture;
22  import io.netty.channel.ChannelHandler;
23  import io.netty.channel.ChannelHandlerContext;
24  import io.netty.channel.ChannelId;
25  import io.netty.channel.ChannelPromise;
26  import io.netty.channel.embedded.EmbeddedChannel;
27  import io.netty.handler.codec.ByteToMessageDecoder;
28  import io.netty.handler.codec.compression.BrotliEncoder;
29  import io.netty.handler.codec.compression.ZlibCodecFactory;
30  import io.netty.handler.codec.compression.ZlibWrapper;
31  import io.netty.handler.codec.compression.Brotli;
32  import io.netty.handler.codec.compression.BrotliOptions;
33  import io.netty.handler.codec.compression.CompressionOptions;
34  import io.netty.handler.codec.compression.DeflateOptions;
35  import io.netty.handler.codec.compression.GzipOptions;
36  import io.netty.handler.codec.compression.StandardCompressionOptions;
37  import io.netty.handler.codec.compression.Zstd;
38  import io.netty.handler.codec.compression.ZstdEncoder;
39  import io.netty.handler.codec.compression.ZstdOptions;
40  import io.netty.handler.codec.compression.SnappyFrameEncoder;
41  import io.netty.handler.codec.compression.SnappyOptions;
42  import io.netty.util.concurrent.PromiseCombiner;
43  import io.netty.util.internal.ObjectUtil;
44  
45  import java.util.ArrayList;
46  import java.util.List;
47  
48  import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_ENCODING;
49  import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_LENGTH;
50  import static io.netty.handler.codec.http.HttpHeaderValues.BR;
51  import static io.netty.handler.codec.http.HttpHeaderValues.DEFLATE;
52  import static io.netty.handler.codec.http.HttpHeaderValues.GZIP;
53  import static io.netty.handler.codec.http.HttpHeaderValues.IDENTITY;
54  import static io.netty.handler.codec.http.HttpHeaderValues.X_DEFLATE;
55  import static io.netty.handler.codec.http.HttpHeaderValues.X_GZIP;
56  import static io.netty.handler.codec.http.HttpHeaderValues.ZSTD;
57  import static io.netty.handler.codec.http.HttpHeaderValues.SNAPPY;
58  
59  /**
60   * A decorating HTTP2 encoder that will compress data frames according to the {@code content-encoding} header for each
61   * stream. The compression provided by this class will be applied to the data for the entire stream.
62   */
63  public class CompressorHttp2ConnectionEncoder extends DecoratingHttp2ConnectionEncoder {
64      // We cannot remove this because it'll be breaking change
65      public static final int DEFAULT_COMPRESSION_LEVEL = 6;
66      public static final int DEFAULT_WINDOW_BITS = 15;
67      public static final int DEFAULT_MEM_LEVEL = 8;
68  
69      private int compressionLevel;
70      private int windowBits;
71      private int memLevel;
72      private final Http2Connection.PropertyKey propertyKey;
73  
74      private final boolean supportsCompressionOptions;
75  
76      private BrotliOptions brotliOptions;
77      private GzipOptions gzipCompressionOptions;
78      private DeflateOptions deflateOptions;
79      private ZstdOptions zstdOptions;
80      private SnappyOptions snappyOptions;
81  
82      /**
83       * Create a new {@link CompressorHttp2ConnectionEncoder} instance
84       * with default implementation of {@link StandardCompressionOptions}
85       */
86      public CompressorHttp2ConnectionEncoder(Http2ConnectionEncoder delegate) {
87          this(delegate, defaultCompressionOptions());
88      }
89  
90      private static CompressionOptions[] defaultCompressionOptions() {
91          List<CompressionOptions> compressionOptions = new ArrayList<CompressionOptions>();
92          compressionOptions.add(StandardCompressionOptions.gzip());
93          compressionOptions.add(StandardCompressionOptions.deflate());
94          compressionOptions.add(StandardCompressionOptions.snappy());
95          if (Brotli.isAvailable()) {
96              compressionOptions.add(StandardCompressionOptions.brotli());
97          }
98          if (Zstd.isAvailable()) {
99              compressionOptions.add(StandardCompressionOptions.zstd());
100         }
101         return compressionOptions.toArray(new CompressionOptions[0]);
102     }
103 
104     /**
105      * Create a new {@link CompressorHttp2ConnectionEncoder} instance
106      */
107     @Deprecated
108     public CompressorHttp2ConnectionEncoder(Http2ConnectionEncoder delegate, int compressionLevel, int windowBits,
109                                             int memLevel) {
110         super(delegate);
111         this.compressionLevel = ObjectUtil.checkInRange(compressionLevel, 0, 9, "compressionLevel");
112         this.windowBits = ObjectUtil.checkInRange(windowBits, 9, 15, "windowBits");
113         this.memLevel = ObjectUtil.checkInRange(memLevel, 1, 9, "memLevel");
114 
115         propertyKey = connection().newKey();
116         connection().addListener(new Http2ConnectionAdapter() {
117             @Override
118             public void onStreamRemoved(Http2Stream stream) {
119                 final EmbeddedChannel compressor = stream.getProperty(propertyKey);
120                 if (compressor != null) {
121                     cleanup(stream, compressor);
122                 }
123             }
124         });
125 
126         supportsCompressionOptions = false;
127     }
128 
129     /**
130      * Create a new {@link CompressorHttp2ConnectionEncoder} with
131      * specified {@link StandardCompressionOptions}
132      */
133     public CompressorHttp2ConnectionEncoder(Http2ConnectionEncoder delegate,
134                                             CompressionOptions... compressionOptionsArgs) {
135         super(delegate);
136         ObjectUtil.checkNotNull(compressionOptionsArgs, "CompressionOptions");
137         ObjectUtil.deepCheckNotNull("CompressionOptions", compressionOptionsArgs);
138 
139         for (CompressionOptions compressionOptions : compressionOptionsArgs) {
140             // BrotliOptions' class initialization depends on Brotli classes being on the classpath.
141             // The Brotli.isAvailable check ensures that BrotliOptions will only get instantiated if Brotli is on
142             // the classpath.
143             // This results in the static analysis of native-image identifying the instanceof BrotliOptions check
144             // and thus BrotliOptions itself as unreachable, enabling native-image to link all classes at build time
145             // and not complain about the missing Brotli classes.
146             if (Brotli.isAvailable() && compressionOptions instanceof BrotliOptions) {
147                 brotliOptions = (BrotliOptions) compressionOptions;
148             } else if (compressionOptions instanceof GzipOptions) {
149                 gzipCompressionOptions = (GzipOptions) compressionOptions;
150             } else if (compressionOptions instanceof DeflateOptions) {
151                 deflateOptions = (DeflateOptions) compressionOptions;
152             } else if (compressionOptions instanceof ZstdOptions) {
153                 zstdOptions = (ZstdOptions) compressionOptions;
154             } else if (compressionOptions instanceof SnappyOptions) {
155                 snappyOptions = (SnappyOptions) compressionOptions;
156             } else {
157                 throw new IllegalArgumentException("Unsupported " + CompressionOptions.class.getSimpleName() +
158                         ": " + compressionOptions);
159             }
160         }
161 
162         supportsCompressionOptions = true;
163 
164         propertyKey = connection().newKey();
165         connection().addListener(new Http2ConnectionAdapter() {
166             @Override
167             public void onStreamRemoved(Http2Stream stream) {
168                 final EmbeddedChannel compressor = stream.getProperty(propertyKey);
169                 if (compressor != null) {
170                     cleanup(stream, compressor);
171                 }
172             }
173         });
174     }
175 
176     @Override
177     public ChannelFuture writeData(final ChannelHandlerContext ctx, final int streamId, ByteBuf data, int padding,
178             final boolean endOfStream, ChannelPromise promise) {
179         final Http2Stream stream = connection().stream(streamId);
180         final EmbeddedChannel channel = stream == null ? null : (EmbeddedChannel) stream.getProperty(propertyKey);
181         if (channel == null) {
182             // The compressor may be null if no compatible encoding type was found in this stream's headers
183             return super.writeData(ctx, streamId, data, padding, endOfStream, promise);
184         }
185 
186         try {
187             // The channel will release the buffer after being written
188             channel.writeOutbound(data);
189             ByteBuf buf = nextReadableBuf(channel);
190             if (buf == null) {
191                 if (endOfStream) {
192                     if (channel.finish()) {
193                         buf = nextReadableBuf(channel);
194                     }
195                     return super.writeData(ctx, streamId, buf == null ? Unpooled.EMPTY_BUFFER : buf, padding,
196                             true, promise);
197                 }
198                 // END_STREAM is not set and the assumption is data is still forthcoming.
199                 promise.setSuccess();
200                 return promise;
201             }
202 
203             PromiseCombiner combiner = new PromiseCombiner(ctx.executor());
204             for (;;) {
205                 ByteBuf nextBuf = nextReadableBuf(channel);
206                 boolean compressedEndOfStream = nextBuf == null && endOfStream;
207                 if (compressedEndOfStream && channel.finish()) {
208                     nextBuf = nextReadableBuf(channel);
209                     compressedEndOfStream = nextBuf == null;
210                 }
211 
212                 ChannelPromise bufPromise = ctx.newPromise();
213                 combiner.add(bufPromise);
214                 super.writeData(ctx, streamId, buf, padding, compressedEndOfStream, bufPromise);
215                 if (nextBuf == null) {
216                     break;
217                 }
218 
219                 padding = 0; // Padding is only communicated once on the first iteration
220                 buf = nextBuf;
221             }
222             combiner.finish(promise);
223         } catch (Throwable cause) {
224             promise.tryFailure(cause);
225         } finally {
226             if (endOfStream) {
227                 cleanup(stream, channel);
228             }
229         }
230         return promise;
231     }
232 
233     @Override
234     public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding,
235             boolean endStream, ChannelPromise promise) {
236         try {
237             // Determine if compression is required and sanitize the headers.
238             EmbeddedChannel compressor = newCompressor(ctx, headers, endStream);
239 
240             // Write the headers and create the stream object.
241             ChannelFuture future = super.writeHeaders(ctx, streamId, headers, padding, endStream, promise);
242 
243             // After the stream object has been created, then attach the compressor as a property for data compression.
244             bindCompressorToStream(compressor, streamId);
245 
246             return future;
247         } catch (Throwable e) {
248             promise.tryFailure(e);
249         }
250         return promise;
251     }
252 
253     @Override
254     public ChannelFuture writeHeaders(final ChannelHandlerContext ctx, final int streamId, final Http2Headers headers,
255             final int streamDependency, final short weight, final boolean exclusive, final int padding,
256             final boolean endOfStream, final ChannelPromise promise) {
257         try {
258             // Determine if compression is required and sanitize the headers.
259             EmbeddedChannel compressor = newCompressor(ctx, headers, endOfStream);
260 
261             // Write the headers and create the stream object.
262             ChannelFuture future = super.writeHeaders(ctx, streamId, headers, streamDependency, weight, exclusive,
263                                                       padding, endOfStream, promise);
264 
265             // After the stream object has been created, then attach the compressor as a property for data compression.
266             bindCompressorToStream(compressor, streamId);
267 
268             return future;
269         } catch (Throwable e) {
270             promise.tryFailure(e);
271         }
272         return promise;
273     }
274 
275     /**
276      * Returns a new {@link EmbeddedChannel} that encodes the HTTP2 message content encoded in the specified
277      * {@code contentEncoding}.
278      *
279      * @param ctx the context.
280      * @param contentEncoding the value of the {@code content-encoding} header
281      * @return a new {@link ByteToMessageDecoder} if the specified encoding is supported. {@code null} otherwise
282      * (alternatively, you can throw a {@link Http2Exception} to block unknown encoding).
283      * @throws Http2Exception If the specified encoding is not supported and warrants an exception
284      */
285     protected EmbeddedChannel newContentCompressor(ChannelHandlerContext ctx, CharSequence contentEncoding)
286             throws Http2Exception {
287         if (GZIP.contentEqualsIgnoreCase(contentEncoding) || X_GZIP.contentEqualsIgnoreCase(contentEncoding)) {
288             return newCompressionChannel(ctx, ZlibWrapper.GZIP);
289         }
290         if (DEFLATE.contentEqualsIgnoreCase(contentEncoding) || X_DEFLATE.contentEqualsIgnoreCase(contentEncoding)) {
291             return newCompressionChannel(ctx, ZlibWrapper.ZLIB);
292         }
293         Channel channel = ctx.channel();
294         if (Brotli.isAvailable() && brotliOptions != null && BR.contentEqualsIgnoreCase(contentEncoding)) {
295             return EmbeddedChannel.builder()
296                     .channelId(channel.id())
297                     .hasDisconnect(channel.metadata().hasDisconnect())
298                     .config(channel.config())
299                     .handlers(new BrotliEncoder(brotliOptions.parameters()))
300                     .build();
301         }
302         if (zstdOptions != null && ZSTD.contentEqualsIgnoreCase(contentEncoding)) {
303             return EmbeddedChannel.builder()
304                     .channelId(channel.id())
305                     .hasDisconnect(channel.metadata().hasDisconnect())
306                     .config(channel.config())
307                     .handlers(new ZstdEncoder(zstdOptions.compressionLevel(),
308                             zstdOptions.blockSize(), zstdOptions.maxEncodeSize()))
309                     .build();
310         }
311         if (snappyOptions != null && SNAPPY.contentEqualsIgnoreCase(contentEncoding)) {
312             return EmbeddedChannel.builder()
313                     .channelId(channel.id())
314                     .hasDisconnect(channel.metadata().hasDisconnect())
315                     .config(channel.config())
316                     .handlers(new SnappyFrameEncoder())
317                     .build();
318         }
319         // 'identity' or unsupported
320         return null;
321     }
322 
323     /**
324      * Returns the expected content encoding of the decoded content. Returning {@code contentEncoding} is the default
325      * behavior, which is the case for most compressors.
326      *
327      * @param contentEncoding the value of the {@code content-encoding} header
328      * @return the expected content encoding of the new content.
329      * @throws Http2Exception if the {@code contentEncoding} is not supported and warrants an exception
330      */
331     protected CharSequence getTargetContentEncoding(CharSequence contentEncoding) throws Http2Exception {
332         return contentEncoding;
333     }
334 
335     /**
336      * Generate a new instance of an {@link EmbeddedChannel} capable of compressing data
337      * @param ctx the context.
338      * @param wrapper Defines what type of encoder should be used
339      */
340     private EmbeddedChannel newCompressionChannel(final ChannelHandlerContext ctx, ZlibWrapper wrapper) {
341         Channel channel = ctx.channel();
342         if (supportsCompressionOptions) {
343             if (wrapper == ZlibWrapper.GZIP && gzipCompressionOptions != null) {
344                 return EmbeddedChannel.builder()
345                         .channelId(channel.id())
346                         .hasDisconnect(channel.metadata().hasDisconnect())
347                         .config(channel.config())
348                         .handlers(ZlibCodecFactory.newZlibEncoder(wrapper,
349                                 gzipCompressionOptions.compressionLevel(),
350                                 gzipCompressionOptions.windowBits(),
351                                 gzipCompressionOptions.memLevel())
352                         )
353                         .build();
354             } else if (wrapper == ZlibWrapper.ZLIB && deflateOptions != null) {
355                 return EmbeddedChannel.builder()
356                         .channelId(channel.id())
357                         .hasDisconnect(channel.metadata().hasDisconnect())
358                         .config(channel.config())
359                         .handlers(ZlibCodecFactory.newZlibEncoder(wrapper,
360                                 deflateOptions.compressionLevel(),
361                                 deflateOptions.windowBits(),
362                                 deflateOptions.memLevel())
363                         )
364                         .build();
365             } else {
366                 throw new IllegalArgumentException("Unsupported ZlibWrapper: " + wrapper);
367             }
368         } else {
369             return EmbeddedChannel.builder()
370                     .channelId(channel.id())
371                     .hasDisconnect(channel.metadata().hasDisconnect())
372                     .config(channel.config())
373                     .handlers(ZlibCodecFactory.newZlibEncoder(wrapper, compressionLevel, windowBits, memLevel))
374                     .build();
375         }
376     }
377 
378     /**
379      * Checks if a new compressor object is needed for the stream identified by {@code streamId}. This method will
380      * modify the {@code content-encoding} header contained in {@code headers}.
381      *
382      * @param ctx the context.
383      * @param headers Object representing headers which are to be written
384      * @param endOfStream Indicates if the stream has ended
385      * @return The channel used to compress data.
386      * @throws Http2Exception if any problems occur during initialization.
387      */
388     private EmbeddedChannel newCompressor(ChannelHandlerContext ctx, Http2Headers headers, boolean endOfStream)
389             throws Http2Exception {
390         if (endOfStream) {
391             return null;
392         }
393 
394         CharSequence encoding = headers.get(CONTENT_ENCODING);
395         if (encoding == null) {
396             encoding = IDENTITY;
397         }
398         final EmbeddedChannel compressor = newContentCompressor(ctx, encoding);
399         if (compressor != null) {
400             CharSequence targetContentEncoding = getTargetContentEncoding(encoding);
401             if (IDENTITY.contentEqualsIgnoreCase(targetContentEncoding)) {
402                 headers.remove(CONTENT_ENCODING);
403             } else {
404                 headers.set(CONTENT_ENCODING, targetContentEncoding);
405             }
406 
407             // The content length will be for the decompressed data. Since we will compress the data
408             // this content-length will not be correct. Instead of queuing messages or delaying sending
409             // header frames...just remove the content-length header
410             headers.remove(CONTENT_LENGTH);
411         }
412 
413         return compressor;
414     }
415 
416     /**
417      * Called after the super class has written the headers and created any associated stream objects.
418      * @param compressor The compressor associated with the stream identified by {@code streamId}.
419      * @param streamId The stream id for which the headers were written.
420      */
421     private void bindCompressorToStream(EmbeddedChannel compressor, int streamId) {
422         if (compressor != null) {
423             Http2Stream stream = connection().stream(streamId);
424             if (stream != null) {
425                 stream.setProperty(propertyKey, compressor);
426             }
427         }
428     }
429 
430     /**
431      * Release remaining content from {@link EmbeddedChannel} and remove the compressor from the {@link Http2Stream}.
432      *
433      * @param stream The stream for which {@code compressor} is the compressor for
434      * @param compressor The compressor for {@code stream}
435      */
436     void cleanup(Http2Stream stream, EmbeddedChannel compressor) {
437         compressor.finishAndReleaseAll();
438         stream.removeProperty(propertyKey);
439     }
440 
441     /**
442      * Read the next compressed {@link ByteBuf} from the {@link EmbeddedChannel} or {@code null} if one does not exist.
443      *
444      * @param compressor The channel to read from
445      * @return The next decoded {@link ByteBuf} from the {@link EmbeddedChannel} or {@code null} if one does not exist
446      */
447     private static ByteBuf nextReadableBuf(EmbeddedChannel compressor) {
448         for (;;) {
449             final ByteBuf buf = compressor.readOutbound();
450             if (buf == null) {
451                 return null;
452             }
453             if (!buf.isReadable()) {
454                 buf.release();
455                 continue;
456             }
457             return buf;
458         }
459     }
460 }