View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License, version 2.0 (the
5    * "License"); you may not use this file except in compliance with the License. You may obtain a
6    * copy of the License at:
7    *
8    * http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software distributed under the License
11   * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12   * or implied. See the License for the specific language governing permissions and limitations under
13   * the License.
14   */
15  package io.netty.handler.codec.http2;
16  
17  import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_ENCODING;
18  import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_LENGTH;
19  import static io.netty.handler.codec.http.HttpHeaderValues.DEFLATE;
20  import static io.netty.handler.codec.http.HttpHeaderValues.GZIP;
21  import static io.netty.handler.codec.http.HttpHeaderValues.IDENTITY;
22  import static io.netty.handler.codec.http.HttpHeaderValues.X_DEFLATE;
23  import static io.netty.handler.codec.http.HttpHeaderValues.X_GZIP;
24  import io.netty.buffer.ByteBuf;
25  import io.netty.buffer.Unpooled;
26  import io.netty.channel.ChannelFuture;
27  import io.netty.channel.ChannelHandlerContext;
28  import io.netty.channel.ChannelPromise;
29  import io.netty.channel.ChannelPromiseAggregator;
30  import io.netty.channel.embedded.EmbeddedChannel;
31  import io.netty.handler.codec.AsciiString;
32  import io.netty.handler.codec.ByteToMessageDecoder;
33  import io.netty.handler.codec.compression.ZlibCodecFactory;
34  import io.netty.handler.codec.compression.ZlibWrapper;
35  
36  /**
37   * A HTTP2 encoder that will compress data frames according to the {@code content-encoding} header for each stream.
38   * The compression provided by this class will be applied to the data for the entire stream.
39   */
40  public class CompressorHttp2ConnectionEncoder extends DefaultHttp2ConnectionEncoder {
41      private static final Http2ConnectionAdapter CLEAN_UP_LISTENER = new Http2ConnectionAdapter() {
42          @Override
43          public void streamRemoved(Http2Stream stream) {
44              final EmbeddedChannel compressor = stream.getProperty(CompressorHttp2ConnectionEncoder.class);
45              if (compressor != null) {
46                  cleanup(stream, compressor);
47              }
48          }
49      };
50  
51      private final int compressionLevel;
52      private final int windowBits;
53      private final int memLevel;
54  
55      /**
56       * Builder for new instances of {@link CompressorHttp2ConnectionEncoder}
57       */
58      public static class Builder extends DefaultHttp2ConnectionEncoder.Builder {
59          protected int compressionLevel = 6;
60          protected int windowBits = 15;
61          protected int memLevel = 8;
62  
63          public Builder compressionLevel(int compressionLevel) {
64              this.compressionLevel = compressionLevel;
65              return this;
66          }
67  
68          public Builder windowBits(int windowBits) {
69              this.windowBits = windowBits;
70              return this;
71          }
72  
73          public Builder memLevel(int memLevel) {
74              this.memLevel = memLevel;
75              return this;
76          }
77  
78          @Override
79          public CompressorHttp2ConnectionEncoder build() {
80              return new CompressorHttp2ConnectionEncoder(this);
81          }
82      }
83  
84      protected CompressorHttp2ConnectionEncoder(Builder builder) {
85          super(builder);
86          if (builder.compressionLevel < 0 || builder.compressionLevel > 9) {
87              throw new IllegalArgumentException("compressionLevel: " + builder.compressionLevel + " (expected: 0-9)");
88          }
89          if (builder.windowBits < 9 || builder.windowBits > 15) {
90              throw new IllegalArgumentException("windowBits: " + builder.windowBits + " (expected: 9-15)");
91          }
92          if (builder.memLevel < 1 || builder.memLevel > 9) {
93              throw new IllegalArgumentException("memLevel: " + builder.memLevel + " (expected: 1-9)");
94          }
95          compressionLevel = builder.compressionLevel;
96          windowBits = builder.windowBits;
97          memLevel = builder.memLevel;
98  
99          connection().addListener(CLEAN_UP_LISTENER);
100     }
101 
102     @Override
103     public ChannelFuture writeData(final ChannelHandlerContext ctx, final int streamId, ByteBuf data, int padding,
104             final boolean endOfStream, ChannelPromise promise) {
105         final Http2Stream stream = connection().stream(streamId);
106         final EmbeddedChannel channel = stream == null ? null :
107             (EmbeddedChannel) stream.getProperty(CompressorHttp2ConnectionEncoder.class);
108         if (channel == null) {
109             // The compressor may be null if no compatible encoding type was found in this stream's headers
110             return super.writeData(ctx, streamId, data, padding, endOfStream, promise);
111         }
112 
113         try {
114             // The channel will release the buffer after being written
115             channel.writeOutbound(data);
116             ByteBuf buf = nextReadableBuf(channel);
117             if (buf == null) {
118                 if (endOfStream) {
119                     if (channel.finish()) {
120                         buf = nextReadableBuf(channel);
121                     }
122                     return super.writeData(ctx, streamId, buf == null ? Unpooled.EMPTY_BUFFER : buf, padding,
123                             true, promise);
124                 }
125                 // END_STREAM is not set and the assumption is data is still forthcoming.
126                 promise.setSuccess();
127                 return promise;
128             }
129 
130             ChannelPromiseAggregator aggregator = new ChannelPromiseAggregator(promise);
131             ChannelPromise bufPromise = ctx.newPromise();
132             aggregator.add(bufPromise);
133             for (;;) {
134                 ByteBuf nextBuf = nextReadableBuf(channel);
135                 boolean compressedEndOfStream = nextBuf == null && endOfStream;
136                 if (compressedEndOfStream && channel.finish()) {
137                     nextBuf = nextReadableBuf(channel);
138                     compressedEndOfStream = nextBuf == null;
139                 }
140 
141                 final ChannelPromise nextPromise;
142                 if (nextBuf != null) {
143                     // We have to add the nextPromise to the aggregator before doing the current write. This is so
144                     // completing the current write before the next write is done won't complete the aggregate promise
145                     nextPromise = ctx.newPromise();
146                     aggregator.add(nextPromise);
147                 } else {
148                     nextPromise = null;
149                 }
150 
151                 super.writeData(ctx, streamId, buf, padding, compressedEndOfStream, bufPromise);
152                 if (nextBuf == null) {
153                     break;
154                 }
155 
156                 padding = 0; // Padding is only communicated once on the first iteration
157                 buf = nextBuf;
158                 bufPromise = nextPromise;
159             }
160             return promise;
161         } finally {
162             if (endOfStream) {
163                 cleanup(stream, channel);
164             }
165         }
166     }
167 
168     @Override
169     public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding,
170             boolean endStream, ChannelPromise promise) {
171         initCompressor(streamId, headers, endStream);
172         return super.writeHeaders(ctx, streamId, headers, padding, endStream, promise);
173     }
174 
175     @Override
176     public ChannelFuture writeHeaders(final ChannelHandlerContext ctx, final int streamId, final Http2Headers headers,
177             final int streamDependency, final short weight, final boolean exclusive, final int padding,
178             final boolean endOfStream, final ChannelPromise promise) {
179         initCompressor(streamId, headers, endOfStream);
180         return super.writeHeaders(ctx, streamId, headers, streamDependency, weight, exclusive, padding, endOfStream,
181                 promise);
182     }
183 
184     /**
185      * Returns a new {@link EmbeddedChannel} that encodes the HTTP2 message content encoded in the specified
186      * {@code contentEncoding}.
187      *
188      * @param contentEncoding the value of the {@code content-encoding} header
189      * @return a new {@link ByteToMessageDecoder} if the specified encoding is supported. {@code null} otherwise
190      * (alternatively, you can throw a {@link Http2Exception} to block unknown encoding).
191      * @throws Http2Exception If the specified encoding is not not supported and warrants an exception
192      */
193     protected EmbeddedChannel newContentCompressor(AsciiString contentEncoding) throws Http2Exception {
194         if (GZIP.equalsIgnoreCase(contentEncoding) || X_GZIP.equalsIgnoreCase(contentEncoding)) {
195             return newCompressionChannel(ZlibWrapper.GZIP);
196         }
197         if (DEFLATE.equalsIgnoreCase(contentEncoding) || X_DEFLATE.equalsIgnoreCase(contentEncoding)) {
198             return newCompressionChannel(ZlibWrapper.ZLIB);
199         }
200         // 'identity' or unsupported
201         return null;
202     }
203 
204     /**
205      * Returns the expected content encoding of the decoded content. Returning {@code contentEncoding} is the default
206      * behavior, which is the case for most compressors.
207      *
208      * @param contentEncoding the value of the {@code content-encoding} header
209      * @return the expected content encoding of the new content.
210      * @throws Http2Exception if the {@code contentEncoding} is not supported and warrants an exception
211      */
212     protected AsciiString getTargetContentEncoding(AsciiString contentEncoding) throws Http2Exception {
213         return contentEncoding;
214     }
215 
216     /**
217      * Generate a new instance of an {@link EmbeddedChannel} capable of compressing data
218      * @param wrapper Defines what type of encoder should be used
219      */
220     private EmbeddedChannel newCompressionChannel(ZlibWrapper wrapper) {
221         return new EmbeddedChannel(ZlibCodecFactory.newZlibEncoder(wrapper, compressionLevel, windowBits,
222                 memLevel));
223     }
224 
225     /**
226      * Checks if a new compressor object is needed for the stream identified by {@code streamId}. This method will
227      * modify the {@code content-encoding} header contained in {@code headers}.
228      *
229      * @param streamId The identifier for the headers inside {@code headers}
230      * @param headers Object representing headers which are to be written
231      * @param endOfStream Indicates if the stream has ended
232      */
233     private void initCompressor(int streamId, Http2Headers headers, boolean endOfStream) {
234         final Http2Stream stream = connection().stream(streamId);
235         if (stream == null) {
236             return;
237         }
238 
239         EmbeddedChannel compressor = stream.getProperty(CompressorHttp2ConnectionEncoder.class);
240         if (compressor == null) {
241             if (!endOfStream) {
242                 AsciiString encoding = headers.get(CONTENT_ENCODING);
243                 if (encoding == null) {
244                     encoding = IDENTITY;
245                 }
246                 try {
247                     compressor = newContentCompressor(encoding);
248                     if (compressor != null) {
249                         stream.setProperty(CompressorHttp2ConnectionEncoder.class, compressor);
250                         AsciiString targetContentEncoding = getTargetContentEncoding(encoding);
251                         if (IDENTITY.equalsIgnoreCase(targetContentEncoding)) {
252                             headers.remove(CONTENT_ENCODING);
253                         } else {
254                             headers.set(CONTENT_ENCODING, targetContentEncoding);
255                         }
256                     }
257                 } catch (Throwable ignored) {
258                     // Ignore
259                 }
260             }
261         } else if (endOfStream) {
262             cleanup(stream, compressor);
263         }
264 
265         if (compressor != null) {
266             // The content length will be for the decompressed data. Since we will compress the data
267             // this content-length will not be correct. Instead of queuing messages or delaying sending
268             // header frames...just remove the content-length header
269             headers.remove(CONTENT_LENGTH);
270         }
271     }
272 
273     /**
274      * Release remaining content from {@link EmbeddedChannel} and remove the compressor from the {@link Http2Stream}.
275      *
276      * @param stream The stream for which {@code compressor} is the compressor for
277      * @param compressor The compressor for {@code stream}
278      */
279     private static void cleanup(Http2Stream stream, EmbeddedChannel compressor) {
280         if (compressor.finish()) {
281             for (;;) {
282                 final ByteBuf buf = compressor.readOutbound();
283                 if (buf == null) {
284                     break;
285                 }
286 
287                 buf.release();
288             }
289         }
290         stream.removeProperty(CompressorHttp2ConnectionEncoder.class);
291     }
292 
293     /**
294      * Read the next compressed {@link ByteBuf} from the {@link EmbeddedChannel} or {@code null} if one does not exist.
295      *
296      * @param compressor The channel to read from
297      * @return The next decoded {@link ByteBuf} from the {@link EmbeddedChannel} or {@code null} if one does not exist
298      */
299     private static ByteBuf nextReadableBuf(EmbeddedChannel compressor) {
300         for (;;) {
301             final ByteBuf buf = compressor.readOutbound();
302             if (buf == null) {
303                 return null;
304             }
305             if (!buf.isReadable()) {
306                 buf.release();
307                 continue;
308             }
309             return buf;
310         }
311     }
312 }