View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.codec.compression;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.channel.ChannelFuture;
20  import io.netty.channel.ChannelFutureListener;
21  import io.netty.channel.ChannelHandlerContext;
22  import io.netty.channel.ChannelPipeline;
23  import io.netty.channel.ChannelPromise;
24  import io.netty.channel.ChannelPromiseNotifier;
25  import io.netty.handler.codec.MessageToByteEncoder;
26  import io.netty.util.concurrent.EventExecutor;
27  
28  import java.util.concurrent.TimeUnit;
29  
30  import static io.netty.handler.codec.compression.Bzip2Constants.*;
31  
32  /**
33   * Compresses a {@link ByteBuf} using the Bzip2 algorithm.
34   *
35   * See <a href="http://en.wikipedia.org/wiki/Bzip2">Bzip2</a>.
36   */
37  public class Bzip2Encoder extends MessageToByteEncoder<ByteBuf> {
38      /**
39       * Current state of stream.
40       */
41      private enum State {
42          INIT,
43          INIT_BLOCK,
44          WRITE_DATA,
45          CLOSE_BLOCK
46      }
47  
48      private State currentState = State.INIT;
49  
50      /**
51       * A writer that provides bit-level writes.
52       */
53      private final Bzip2BitWriter writer = new Bzip2BitWriter();
54  
55      /**
56       * The declared maximum block size of the stream (before final run-length decoding).
57       */
58      private final int streamBlockSize;
59  
60      /**
61       * The merged CRC of all blocks compressed so far.
62       */
63      private int streamCRC;
64  
65      /**
66       * The compressor for the current block.
67       */
68      private Bzip2BlockCompressor blockCompressor;
69  
70      /**
71       * (@code true} if the compressed stream has been finished, otherwise {@code false}.
72       */
73      private volatile boolean finished;
74  
75      /**
76       * Used to interact with its {@link ChannelPipeline} and other handlers.
77       */
78      private volatile ChannelHandlerContext ctx;
79  
80      /**
81       * Creates a new bzip2 encoder with the maximum (900,000 byte) block size.
82       */
83      public Bzip2Encoder() {
84          this(MAX_BLOCK_SIZE);
85      }
86  
87      /**
88       * Creates a new bzip2 encoder with the specified {@code blockSizeMultiplier}.
89       * @param blockSizeMultiplier
90       *        The Bzip2 block size as a multiple of 100,000 bytes (minimum {@code 1}, maximum {@code 9}).
91       *        Larger block sizes require more memory for both compression and decompression,
92       *        but give better compression ratios. {@code 9} will usually be the best value to use.
93       */
94      public Bzip2Encoder(final int blockSizeMultiplier) {
95          if (blockSizeMultiplier < MIN_BLOCK_SIZE || blockSizeMultiplier > MAX_BLOCK_SIZE) {
96              throw new IllegalArgumentException(
97                      "blockSizeMultiplier: " + blockSizeMultiplier + " (expected: 1-9)");
98          }
99          streamBlockSize = blockSizeMultiplier * BASE_BLOCK_SIZE;
100     }
101 
102     @Override
103     protected void encode(ChannelHandlerContext ctx, ByteBuf in, ByteBuf out) throws Exception {
104         if (finished) {
105             out.writeBytes(in);
106             return;
107         }
108 
109         for (;;) {
110             switch (currentState) {
111                 case INIT:
112                     out.ensureWritable(4);
113                     out.writeMedium(MAGIC_NUMBER);
114                     out.writeByte('0' + streamBlockSize / BASE_BLOCK_SIZE);
115                     currentState = State.INIT_BLOCK;
116                     // fall through
117                 case INIT_BLOCK:
118                     blockCompressor = new Bzip2BlockCompressor(writer, streamBlockSize);
119                     currentState = State.WRITE_DATA;
120                     // fall through
121                 case WRITE_DATA:
122                     if (!in.isReadable()) {
123                         return;
124                     }
125                     Bzip2BlockCompressor blockCompressor = this.blockCompressor;
126                     final int length = Math.min(in.readableBytes(), blockCompressor.availableSize());
127                     final int bytesWritten = blockCompressor.write(in, in.readerIndex(), length);
128                     in.skipBytes(bytesWritten);
129                     if (!blockCompressor.isFull()) {
130                         if (in.isReadable()) {
131                             break;
132                         } else {
133                             return;
134                         }
135                     }
136                     currentState = State.CLOSE_BLOCK;
137                     // fall through
138                 case CLOSE_BLOCK:
139                     closeBlock(out);
140                     currentState = State.INIT_BLOCK;
141                     break;
142                 default:
143                     throw new IllegalStateException();
144             }
145         }
146     }
147 
148     /**
149      * Close current block and update {@link #streamCRC}.
150      */
151     private void closeBlock(ByteBuf out) {
152         final Bzip2BlockCompressor blockCompressor = this.blockCompressor;
153         if (!blockCompressor.isEmpty()) {
154             blockCompressor.close(out);
155             final int blockCRC = blockCompressor.crc();
156             streamCRC = (streamCRC << 1 | streamCRC >>> 31) ^ blockCRC;
157         }
158     }
159 
160     /**
161      * Returns {@code true} if and only if the end of the compressed stream has been reached.
162      */
163     public boolean isClosed() {
164         return finished;
165     }
166 
167     /**
168      * Close this {@link Bzip2Encoder} and so finish the encoding.
169      *
170      * The returned {@link ChannelFuture} will be notified once the operation completes.
171      */
172     public ChannelFuture close() {
173         return close(ctx().newPromise());
174     }
175 
176     /**
177      * Close this {@link Bzip2Encoder} and so finish the encoding.
178      * The given {@link ChannelFuture} will be notified once the operation
179      * completes and will also be returned.
180      */
181     public ChannelFuture close(final ChannelPromise promise) {
182         ChannelHandlerContext ctx = ctx();
183         EventExecutor executor = ctx.executor();
184         if (executor.inEventLoop()) {
185             return finishEncode(ctx, promise);
186         } else {
187             executor.execute(new Runnable() {
188                 @Override
189                 public void run() {
190                     ChannelFuture f = finishEncode(ctx(), promise);
191                     f.addListener(new ChannelPromiseNotifier(promise));
192                 }
193             });
194             return promise;
195         }
196     }
197 
198     @Override
199     public void close(final ChannelHandlerContext ctx, final ChannelPromise promise) throws Exception {
200         ChannelFuture f = finishEncode(ctx, ctx.newPromise());
201         f.addListener(new ChannelFutureListener() {
202             @Override
203             public void operationComplete(ChannelFuture f) throws Exception {
204                 ctx.close(promise);
205             }
206         });
207 
208         if (!f.isDone()) {
209             // Ensure the channel is closed even if the write operation completes in time.
210             ctx.executor().schedule(new Runnable() {
211                 @Override
212                 public void run() {
213                     ctx.close(promise);
214                 }
215             }, 10, TimeUnit.SECONDS); // FIXME: Magic number
216         }
217     }
218 
219     private ChannelFuture finishEncode(final ChannelHandlerContext ctx, ChannelPromise promise) {
220         if (finished) {
221             promise.setSuccess();
222             return promise;
223         }
224         finished = true;
225 
226         final ByteBuf footer = ctx.alloc().buffer();
227         closeBlock(footer);
228 
229         final int streamCRC = this.streamCRC;
230         final Bzip2BitWriter writer = this.writer;
231         try {
232             writer.writeBits(footer, 24, END_OF_STREAM_MAGIC_1);
233             writer.writeBits(footer, 24, END_OF_STREAM_MAGIC_2);
234             writer.writeInt(footer, streamCRC);
235             writer.flush(footer);
236         } finally {
237             blockCompressor = null;
238         }
239         return ctx.writeAndFlush(footer, promise);
240     }
241 
242     private ChannelHandlerContext ctx() {
243         ChannelHandlerContext ctx = this.ctx;
244         if (ctx == null) {
245             throw new IllegalStateException("not added to a pipeline");
246         }
247         return ctx;
248     }
249 
250     @Override
251     public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
252         this.ctx = ctx;
253     }
254 }