View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License, version 2.0 (the
5    * "License"); you may not use this file except in compliance with the License. You may obtain a
6    * copy of the License at:
7    *
8    * http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software distributed under the License
11   * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12   * or implied. See the License for the specific language governing permissions and limitations under
13   * the License.
14   */
15  package io.netty.handler.codec.http2;
16  
17  import io.netty.buffer.ByteBuf;
18  import io.netty.buffer.Unpooled;
19  import io.netty.channel.ChannelFuture;
20  import io.netty.channel.ChannelHandlerContext;
21  import io.netty.channel.ChannelPromise;
22  import io.netty.channel.embedded.EmbeddedChannel;
23  import io.netty.handler.codec.ByteToMessageDecoder;
24  import io.netty.handler.codec.compression.ZlibCodecFactory;
25  import io.netty.handler.codec.compression.ZlibWrapper;
26  import io.netty.util.concurrent.PromiseCombiner;
27  import io.netty.util.internal.UnstableApi;
28  
29  import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_ENCODING;
30  import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_LENGTH;
31  import static io.netty.handler.codec.http.HttpHeaderValues.DEFLATE;
32  import static io.netty.handler.codec.http.HttpHeaderValues.GZIP;
33  import static io.netty.handler.codec.http.HttpHeaderValues.IDENTITY;
34  import static io.netty.handler.codec.http.HttpHeaderValues.X_DEFLATE;
35  import static io.netty.handler.codec.http.HttpHeaderValues.X_GZIP;
36  
37  /**
38   * A decorating HTTP2 encoder that will compress data frames according to the {@code content-encoding} header for each
39   * stream. The compression provided by this class will be applied to the data for the entire stream.
40   */
41  @UnstableApi
42  public class CompressorHttp2ConnectionEncoder extends DecoratingHttp2ConnectionEncoder {
43      public static final int DEFAULT_COMPRESSION_LEVEL = 6;
44      public static final int DEFAULT_WINDOW_BITS = 15;
45      public static final int DEFAULT_MEM_LEVEL = 8;
46  
47      private final int compressionLevel;
48      private final int windowBits;
49      private final int memLevel;
50      private final Http2Connection.PropertyKey propertyKey;
51  
52      public CompressorHttp2ConnectionEncoder(Http2ConnectionEncoder delegate) {
53          this(delegate, DEFAULT_COMPRESSION_LEVEL, DEFAULT_WINDOW_BITS, DEFAULT_MEM_LEVEL);
54      }
55  
56      public CompressorHttp2ConnectionEncoder(Http2ConnectionEncoder delegate, int compressionLevel, int windowBits,
57                                              int memLevel) {
58          super(delegate);
59          if (compressionLevel < 0 || compressionLevel > 9) {
60              throw new IllegalArgumentException("compressionLevel: " + compressionLevel + " (expected: 0-9)");
61          }
62          if (windowBits < 9 || windowBits > 15) {
63              throw new IllegalArgumentException("windowBits: " + windowBits + " (expected: 9-15)");
64          }
65          if (memLevel < 1 || memLevel > 9) {
66              throw new IllegalArgumentException("memLevel: " + memLevel + " (expected: 1-9)");
67          }
68          this.compressionLevel = compressionLevel;
69          this.windowBits = windowBits;
70          this.memLevel = memLevel;
71  
72          propertyKey = connection().newKey();
73          connection().addListener(new Http2ConnectionAdapter() {
74              @Override
75              public void onStreamRemoved(Http2Stream stream) {
76                  final EmbeddedChannel compressor = stream.getProperty(propertyKey);
77                  if (compressor != null) {
78                      cleanup(stream, compressor);
79                  }
80              }
81          });
82      }
83  
84      @Override
85      public ChannelFuture writeData(final ChannelHandlerContext ctx, final int streamId, ByteBuf data, int padding,
86              final boolean endOfStream, ChannelPromise promise) {
87          final Http2Stream stream = connection().stream(streamId);
88          final EmbeddedChannel channel = stream == null ? null : (EmbeddedChannel) stream.getProperty(propertyKey);
89          if (channel == null) {
90              // The compressor may be null if no compatible encoding type was found in this stream's headers
91              return super.writeData(ctx, streamId, data, padding, endOfStream, promise);
92          }
93  
94          try {
95              // The channel will release the buffer after being written
96              channel.writeOutbound(data);
97              ByteBuf buf = nextReadableBuf(channel);
98              if (buf == null) {
99                  if (endOfStream) {
100                     if (channel.finish()) {
101                         buf = nextReadableBuf(channel);
102                     }
103                     return super.writeData(ctx, streamId, buf == null ? Unpooled.EMPTY_BUFFER : buf, padding,
104                             true, promise);
105                 }
106                 // END_STREAM is not set and the assumption is data is still forthcoming.
107                 promise.setSuccess();
108                 return promise;
109             }
110 
111             PromiseCombiner combiner = new PromiseCombiner();
112             for (;;) {
113                 ByteBuf nextBuf = nextReadableBuf(channel);
114                 boolean compressedEndOfStream = nextBuf == null && endOfStream;
115                 if (compressedEndOfStream && channel.finish()) {
116                     nextBuf = nextReadableBuf(channel);
117                     compressedEndOfStream = nextBuf == null;
118                 }
119 
120                 ChannelPromise bufPromise = ctx.newPromise();
121                 combiner.add(bufPromise);
122                 super.writeData(ctx, streamId, buf, padding, compressedEndOfStream, bufPromise);
123                 if (nextBuf == null) {
124                     break;
125                 }
126 
127                 padding = 0; // Padding is only communicated once on the first iteration
128                 buf = nextBuf;
129             }
130             combiner.finish(promise);
131         } catch (Throwable cause) {
132             promise.tryFailure(cause);
133         } finally {
134             if (endOfStream) {
135                 cleanup(stream, channel);
136             }
137         }
138         return promise;
139     }
140 
141     @Override
142     public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding,
143             boolean endStream, ChannelPromise promise) {
144         try {
145             // Determine if compression is required and sanitize the headers.
146             EmbeddedChannel compressor = newCompressor(ctx, headers, endStream);
147 
148             // Write the headers and create the stream object.
149             ChannelFuture future = super.writeHeaders(ctx, streamId, headers, padding, endStream, promise);
150 
151             // After the stream object has been created, then attach the compressor as a property for data compression.
152             bindCompressorToStream(compressor, streamId);
153 
154             return future;
155         } catch (Throwable e) {
156             promise.tryFailure(e);
157         }
158         return promise;
159     }
160 
161     @Override
162     public ChannelFuture writeHeaders(final ChannelHandlerContext ctx, final int streamId, final Http2Headers headers,
163             final int streamDependency, final short weight, final boolean exclusive, final int padding,
164             final boolean endOfStream, final ChannelPromise promise) {
165         try {
166             // Determine if compression is required and sanitize the headers.
167             EmbeddedChannel compressor = newCompressor(ctx, headers, endOfStream);
168 
169             // Write the headers and create the stream object.
170             ChannelFuture future = super.writeHeaders(ctx, streamId, headers, streamDependency, weight, exclusive,
171                                                       padding, endOfStream, promise);
172 
173             // After the stream object has been created, then attach the compressor as a property for data compression.
174             bindCompressorToStream(compressor, streamId);
175 
176             return future;
177         } catch (Throwable e) {
178             promise.tryFailure(e);
179         }
180         return promise;
181     }
182 
183     /**
184      * Returns a new {@link EmbeddedChannel} that encodes the HTTP2 message content encoded in the specified
185      * {@code contentEncoding}.
186      *
187      * @param ctx the context.
188      * @param contentEncoding the value of the {@code content-encoding} header
189      * @return a new {@link ByteToMessageDecoder} if the specified encoding is supported. {@code null} otherwise
190      * (alternatively, you can throw a {@link Http2Exception} to block unknown encoding).
191      * @throws Http2Exception If the specified encoding is not not supported and warrants an exception
192      */
193     protected EmbeddedChannel newContentCompressor(ChannelHandlerContext ctx, CharSequence contentEncoding)
194             throws Http2Exception {
195         if (GZIP.contentEqualsIgnoreCase(contentEncoding) || X_GZIP.contentEqualsIgnoreCase(contentEncoding)) {
196             return newCompressionChannel(ctx, ZlibWrapper.GZIP);
197         }
198         if (DEFLATE.contentEqualsIgnoreCase(contentEncoding) || X_DEFLATE.contentEqualsIgnoreCase(contentEncoding)) {
199             return newCompressionChannel(ctx, ZlibWrapper.ZLIB);
200         }
201         // 'identity' or unsupported
202         return null;
203     }
204 
205     /**
206      * Returns the expected content encoding of the decoded content. Returning {@code contentEncoding} is the default
207      * behavior, which is the case for most compressors.
208      *
209      * @param contentEncoding the value of the {@code content-encoding} header
210      * @return the expected content encoding of the new content.
211      * @throws Http2Exception if the {@code contentEncoding} is not supported and warrants an exception
212      */
213     protected CharSequence getTargetContentEncoding(CharSequence contentEncoding) throws Http2Exception {
214         return contentEncoding;
215     }
216 
217     /**
218      * Generate a new instance of an {@link EmbeddedChannel} capable of compressing data
219      * @param ctx the context.
220      * @param wrapper Defines what type of encoder should be used
221      */
222     private EmbeddedChannel newCompressionChannel(final ChannelHandlerContext ctx, ZlibWrapper wrapper) {
223         return new EmbeddedChannel(ctx.channel().id(), ctx.channel().metadata().hasDisconnect(),
224                 ctx.channel().config(), ZlibCodecFactory.newZlibEncoder(wrapper, compressionLevel, windowBits,
225                 memLevel));
226     }
227 
228     /**
229      * Checks if a new compressor object is needed for the stream identified by {@code streamId}. This method will
230      * modify the {@code content-encoding} header contained in {@code headers}.
231      *
232      * @param ctx the context.
233      * @param headers Object representing headers which are to be written
234      * @param endOfStream Indicates if the stream has ended
235      * @return The channel used to compress data.
236      * @throws Http2Exception if any problems occur during initialization.
237      */
238     private EmbeddedChannel newCompressor(ChannelHandlerContext ctx, Http2Headers headers, boolean endOfStream)
239             throws Http2Exception {
240         if (endOfStream) {
241             return null;
242         }
243 
244         CharSequence encoding = headers.get(CONTENT_ENCODING);
245         if (encoding == null) {
246             encoding = IDENTITY;
247         }
248         final EmbeddedChannel compressor = newContentCompressor(ctx, encoding);
249         if (compressor != null) {
250             CharSequence targetContentEncoding = getTargetContentEncoding(encoding);
251             if (IDENTITY.contentEqualsIgnoreCase(targetContentEncoding)) {
252                 headers.remove(CONTENT_ENCODING);
253             } else {
254                 headers.set(CONTENT_ENCODING, targetContentEncoding);
255             }
256 
257             // The content length will be for the decompressed data. Since we will compress the data
258             // this content-length will not be correct. Instead of queuing messages or delaying sending
259             // header frames...just remove the content-length header
260             headers.remove(CONTENT_LENGTH);
261         }
262 
263         return compressor;
264     }
265 
266     /**
267      * Called after the super class has written the headers and created any associated stream objects.
268      * @param compressor The compressor associated with the stream identified by {@code streamId}.
269      * @param streamId The stream id for which the headers were written.
270      */
271     private void bindCompressorToStream(EmbeddedChannel compressor, int streamId) {
272         if (compressor != null) {
273             Http2Stream stream = connection().stream(streamId);
274             if (stream != null) {
275                 stream.setProperty(propertyKey, compressor);
276             }
277         }
278     }
279 
280     /**
281      * Release remaining content from {@link EmbeddedChannel} and remove the compressor from the {@link Http2Stream}.
282      *
283      * @param stream The stream for which {@code compressor} is the compressor for
284      * @param compressor The compressor for {@code stream}
285      */
286     void cleanup(Http2Stream stream, EmbeddedChannel compressor) {
287         if (compressor.finish()) {
288             for (;;) {
289                 final ByteBuf buf = compressor.readOutbound();
290                 if (buf == null) {
291                     break;
292                 }
293 
294                 buf.release();
295             }
296         }
297         stream.removeProperty(propertyKey);
298     }
299 
300     /**
301      * Read the next compressed {@link ByteBuf} from the {@link EmbeddedChannel} or {@code null} if one does not exist.
302      *
303      * @param compressor The channel to read from
304      * @return The next decoded {@link ByteBuf} from the {@link EmbeddedChannel} or {@code null} if one does not exist
305      */
306     private static ByteBuf nextReadableBuf(EmbeddedChannel compressor) {
307         for (;;) {
308             final ByteBuf buf = compressor.readOutbound();
309             if (buf == null) {
310                 return null;
311             }
312             if (!buf.isReadable()) {
313                 buf.release();
314                 continue;
315             }
316             return buf;
317         }
318     }
319 }