View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.codec.compression;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.channel.ChannelFuture;
20  import io.netty.channel.ChannelFutureListener;
21  import io.netty.channel.ChannelHandlerContext;
22  import io.netty.channel.ChannelPipeline;
23  import io.netty.channel.ChannelPromise;
24  import io.netty.handler.codec.MessageToByteEncoder;
25  import io.netty.util.concurrent.EventExecutor;
26  import io.netty.util.concurrent.PromiseNotifier;
27  
28  import java.util.concurrent.TimeUnit;
29  
30  import static io.netty.handler.codec.compression.Bzip2Constants.BASE_BLOCK_SIZE;
31  import static io.netty.handler.codec.compression.Bzip2Constants.END_OF_STREAM_MAGIC_1;
32  import static io.netty.handler.codec.compression.Bzip2Constants.END_OF_STREAM_MAGIC_2;
33  import static io.netty.handler.codec.compression.Bzip2Constants.MAGIC_NUMBER;
34  import static io.netty.handler.codec.compression.Bzip2Constants.MAX_BLOCK_SIZE;
35  import static io.netty.handler.codec.compression.Bzip2Constants.MIN_BLOCK_SIZE;
36  import static io.netty.handler.codec.compression.Bzip2Constants.THREAD_POOL_DELAY_SECONDS;
37  
38  /**
39   * Compresses a {@link ByteBuf} using the Bzip2 algorithm.
40   *
41   * See <a href="https://en.wikipedia.org/wiki/Bzip2">Bzip2</a>.
42   */
43  public class Bzip2Encoder extends MessageToByteEncoder<ByteBuf> {
44      /**
45       * Current state of stream.
46       */
47      private enum State {
48          INIT,
49          INIT_BLOCK,
50          WRITE_DATA,
51          CLOSE_BLOCK
52      }
53  
54      private State currentState = State.INIT;
55  
56      /**
57       * A writer that provides bit-level writes.
58       */
59      private final Bzip2BitWriter writer = new Bzip2BitWriter();
60  
61      /**
62       * The declared maximum block size of the stream (before final run-length decoding).
63       */
64      private final int streamBlockSize;
65  
66      /**
67       * The merged CRC of all blocks compressed so far.
68       */
69      private int streamCRC;
70  
71      /**
72       * The compressor for the current block.
73       */
74      private Bzip2BlockCompressor blockCompressor;
75  
76      /**
77       * (@code true} if the compressed stream has been finished, otherwise {@code false}.
78       */
79      private volatile boolean finished;
80  
81      /**
82       * Used to interact with its {@link ChannelPipeline} and other handlers.
83       */
84      private volatile ChannelHandlerContext ctx;
85  
86      /**
87       * Creates a new bzip2 encoder with the maximum (900,000 byte) block size.
88       */
89      public Bzip2Encoder() {
90          this(MAX_BLOCK_SIZE);
91      }
92  
93      /**
94       * Creates a new bzip2 encoder with the specified {@code blockSizeMultiplier}.
95       * @param blockSizeMultiplier
96       *        The Bzip2 block size as a multiple of 100,000 bytes (minimum {@code 1}, maximum {@code 9}).
97       *        Larger block sizes require more memory for both compression and decompression,
98       *        but give better compression ratios. {@code 9} will usually be the best value to use.
99       */
100     public Bzip2Encoder(final int blockSizeMultiplier) {
101         if (blockSizeMultiplier < MIN_BLOCK_SIZE || blockSizeMultiplier > MAX_BLOCK_SIZE) {
102             throw new IllegalArgumentException(
103                     "blockSizeMultiplier: " + blockSizeMultiplier + " (expected: 1-9)");
104         }
105         streamBlockSize = blockSizeMultiplier * BASE_BLOCK_SIZE;
106     }
107 
108     @Override
109     protected void encode(ChannelHandlerContext ctx, ByteBuf in, ByteBuf out) throws Exception {
110         if (finished) {
111             out.writeBytes(in);
112             return;
113         }
114 
115         for (;;) {
116             switch (currentState) {
117                 case INIT:
118                     out.ensureWritable(4);
119                     out.writeMedium(MAGIC_NUMBER);
120                     out.writeByte('0' + streamBlockSize / BASE_BLOCK_SIZE);
121                     currentState = State.INIT_BLOCK;
122                     // fall through
123                 case INIT_BLOCK:
124                     blockCompressor = new Bzip2BlockCompressor(writer, streamBlockSize);
125                     currentState = State.WRITE_DATA;
126                     // fall through
127                 case WRITE_DATA:
128                     if (!in.isReadable()) {
129                         return;
130                     }
131                     Bzip2BlockCompressor blockCompressor = this.blockCompressor;
132                     final int length = Math.min(in.readableBytes(), blockCompressor.availableSize());
133                     final int bytesWritten = blockCompressor.write(in, in.readerIndex(), length);
134                     in.skipBytes(bytesWritten);
135                     if (!blockCompressor.isFull()) {
136                         if (in.isReadable()) {
137                             break;
138                         } else {
139                             return;
140                         }
141                     }
142                     currentState = State.CLOSE_BLOCK;
143                     // fall through
144                 case CLOSE_BLOCK:
145                     closeBlock(out);
146                     currentState = State.INIT_BLOCK;
147                     break;
148                 default:
149                     throw new IllegalStateException();
150             }
151         }
152     }
153 
154     /**
155      * Close current block and update {@link #streamCRC}.
156      */
157     private void closeBlock(ByteBuf out) {
158         final Bzip2BlockCompressor blockCompressor = this.blockCompressor;
159         if (!blockCompressor.isEmpty()) {
160             blockCompressor.close(out);
161             final int blockCRC = blockCompressor.crc();
162             streamCRC = (streamCRC << 1 | streamCRC >>> 31) ^ blockCRC;
163         }
164     }
165 
166     /**
167      * Returns {@code true} if and only if the end of the compressed stream has been reached.
168      */
169     public boolean isClosed() {
170         return finished;
171     }
172 
173     /**
174      * Close this {@link Bzip2Encoder} and so finish the encoding.
175      *
176      * The returned {@link ChannelFuture} will be notified once the operation completes.
177      */
178     public ChannelFuture close() {
179         return close(ctx().newPromise());
180     }
181 
182     /**
183      * Close this {@link Bzip2Encoder} and so finish the encoding.
184      * The given {@link ChannelFuture} will be notified once the operation
185      * completes and will also be returned.
186      */
187     public ChannelFuture close(final ChannelPromise promise) {
188         ChannelHandlerContext ctx = ctx();
189         EventExecutor executor = ctx.executor();
190         if (executor.inEventLoop()) {
191             return finishEncode(ctx, promise);
192         } else {
193             executor.execute(new Runnable() {
194                 @Override
195                 public void run() {
196                     ChannelFuture f = finishEncode(ctx(), promise);
197                     PromiseNotifier.cascade(f, promise);
198                 }
199             });
200             return promise;
201         }
202     }
203 
204     @Override
205     public void close(final ChannelHandlerContext ctx, final ChannelPromise promise) throws Exception {
206         ChannelFuture f = finishEncode(ctx, ctx.newPromise());
207         f.addListener(new ChannelFutureListener() {
208             @Override
209             public void operationComplete(ChannelFuture f) throws Exception {
210                 ctx.close(promise);
211             }
212         });
213 
214         if (!f.isDone()) {
215             // Ensure the channel is closed even if the write operation completes in time.
216             ctx.executor().schedule(new Runnable() {
217                 @Override
218                 public void run() {
219                     ctx.close(promise);
220                 }
221             }, THREAD_POOL_DELAY_SECONDS, TimeUnit.SECONDS);
222         }
223     }
224 
225     private ChannelFuture finishEncode(final ChannelHandlerContext ctx, ChannelPromise promise) {
226         if (finished) {
227             promise.setSuccess();
228             return promise;
229         }
230         finished = true;
231 
232         final ByteBuf footer = ctx.alloc().buffer();
233         closeBlock(footer);
234 
235         final int streamCRC = this.streamCRC;
236         final Bzip2BitWriter writer = this.writer;
237         try {
238             writer.writeBits(footer, 24, END_OF_STREAM_MAGIC_1);
239             writer.writeBits(footer, 24, END_OF_STREAM_MAGIC_2);
240             writer.writeInt(footer, streamCRC);
241             writer.flush(footer);
242         } finally {
243             blockCompressor = null;
244         }
245         return ctx.writeAndFlush(footer, promise);
246     }
247 
248     private ChannelHandlerContext ctx() {
249         ChannelHandlerContext ctx = this.ctx;
250         if (ctx == null) {
251             throw new IllegalStateException("not added to a pipeline");
252         }
253         return ctx;
254     }
255 
256     @Override
257     public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
258         this.ctx = ctx;
259     }
260 }