View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.codec.compression;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.channel.ChannelFuture;
20  import io.netty.channel.ChannelFutureListener;
21  import io.netty.channel.ChannelHandlerContext;
22  import io.netty.channel.ChannelPipeline;
23  import io.netty.channel.ChannelPromise;
24  import io.netty.channel.ChannelPromiseNotifier;
25  import io.netty.handler.codec.MessageToByteEncoder;
26  import io.netty.util.concurrent.EventExecutor;
27  
28  import java.util.concurrent.TimeUnit;
29  
30  import static io.netty.handler.codec.compression.Bzip2Constants.*;
31  
32  /**
33   * Compresses a {@link ByteBuf} using the Bzip2 algorithm.
34   *
35   * See <a href="http://en.wikipedia.org/wiki/Bzip2">Bzip2</a>.
36   */
37  public class Bzip2Encoder extends MessageToByteEncoder<ByteBuf> {
38      /**
39       * Current state of stream.
40       */
41      private enum State {
42          INIT,
43          INIT_BLOCK,
44          WRITE_DATA,
45          CLOSE_BLOCK
46      }
47  
48      private State currentState = State.INIT;
49  
50      /**
51       * A writer that provides bit-level writes.
52       */
53      private final Bzip2BitWriter writer = new Bzip2BitWriter();
54  
55      /**
56       * The declared maximum block size of the stream (before final run-length decoding).
57       */
58      private final int streamBlockSize;
59  
60      /**
61       * The merged CRC of all blocks compressed so far.
62       */
63      private int streamCRC;
64  
65      /**
66       * The compressor for the current block.
67       */
68      private Bzip2BlockCompressor blockCompressor;
69  
70      /**
71       * (@code true} if the compressed stream has been finished, otherwise {@code false}.
72       */
73      private volatile boolean finished;
74  
75      /**
76       * Used to interact with its {@link ChannelPipeline} and other handlers.
77       */
78      private volatile ChannelHandlerContext ctx;
79  
80      /**
81       * Creates a new bzip2 encoder with the maximum (900,000 byte) block size.
82       */
83      public Bzip2Encoder() {
84          this(MAX_BLOCK_SIZE);
85      }
86  
87      /**
88       * Creates a new bzip2 encoder with the specified {@code blockSizeMultiplier}.
89       * @param blockSizeMultiplier
90       *        The Bzip2 block size as a multiple of 100,000 bytes (minimum {@code 1}, maximum {@code 9}).
91       *        Larger block sizes require more memory for both compression and decompression,
92       *        but give better compression ratios. {@code 9} will usually be the best value to use.
93       */
94      public Bzip2Encoder(final int blockSizeMultiplier) {
95          if (blockSizeMultiplier < MIN_BLOCK_SIZE || blockSizeMultiplier > MAX_BLOCK_SIZE) {
96              throw new IllegalArgumentException(
97                      "blockSizeMultiplier: " + blockSizeMultiplier + " (expected: 1-9)");
98          }
99          streamBlockSize = blockSizeMultiplier * BASE_BLOCK_SIZE;
100     }
101 
102     @Override
103     protected void encode(ChannelHandlerContext ctx, ByteBuf in, ByteBuf out) throws Exception {
104         if (finished) {
105             out.writeBytes(in);
106             return;
107         }
108 
109         for (;;) {
110             switch (currentState) {
111                 case INIT:
112                     out.ensureWritable(4);
113                     out.writeMedium(MAGIC_NUMBER);
114                     out.writeByte('0' + streamBlockSize / BASE_BLOCK_SIZE);
115                     currentState = State.INIT_BLOCK;
116                 case INIT_BLOCK:
117                     blockCompressor = new Bzip2BlockCompressor(writer, streamBlockSize);
118                     currentState = State.WRITE_DATA;
119                 case WRITE_DATA:
120                     if (!in.isReadable()) {
121                         return;
122                     }
123                     Bzip2BlockCompressor blockCompressor = this.blockCompressor;
124                     final int length = in.readableBytes() < blockCompressor.availableSize() ?
125                                     in.readableBytes() : blockCompressor.availableSize();
126                     final int offset;
127                     final byte[] array;
128                     if (in.hasArray()) {
129                         array = in.array();
130                         offset = in.arrayOffset() + in.readerIndex();
131                     } else {
132                         array = new byte[length];
133                         in.getBytes(in.readerIndex(), array);
134                         offset = 0;
135                     }
136                     final int bytesWritten = blockCompressor.write(array, offset, length);
137                     in.skipBytes(bytesWritten);
138                     if (!blockCompressor.isFull()) {
139                         if (in.isReadable()) {
140                             break;
141                         } else {
142                             return;
143                         }
144                     }
145                     currentState = State.CLOSE_BLOCK;
146                 case CLOSE_BLOCK:
147                     closeBlock(out);
148                     currentState = State.INIT_BLOCK;
149                     break;
150                 default:
151                     throw new IllegalStateException();
152             }
153         }
154     }
155 
156     /**
157      * Close current block and update {@link #streamCRC}.
158      */
159     private void closeBlock(ByteBuf out) {
160         final Bzip2BlockCompressor blockCompressor = this.blockCompressor;
161         if (!blockCompressor.isEmpty()) {
162             blockCompressor.close(out);
163             final int blockCRC = blockCompressor.crc();
164             streamCRC = (streamCRC << 1 | streamCRC >>> 31) ^ blockCRC;
165         }
166     }
167 
168     /**
169      * Returns {@code true} if and only if the end of the compressed stream has been reached.
170      */
171     public boolean isClosed() {
172         return finished;
173     }
174 
175     /**
176      * Close this {@link Bzip2Encoder} and so finish the encoding.
177      *
178      * The returned {@link ChannelFuture} will be notified once the operation completes.
179      */
180     public ChannelFuture close() {
181         return close(ctx().newPromise());
182     }
183 
184     /**
185      * Close this {@link Bzip2Encoder} and so finish the encoding.
186      * The given {@link ChannelFuture} will be notified once the operation
187      * completes and will also be returned.
188      */
189     public ChannelFuture close(final ChannelPromise promise) {
190         ChannelHandlerContext ctx = ctx();
191         EventExecutor executor = ctx.executor();
192         if (executor.inEventLoop()) {
193             return finishEncode(ctx, promise);
194         } else {
195             executor.execute(new Runnable() {
196                 @Override
197                 public void run() {
198                     ChannelFuture f = finishEncode(ctx(), promise);
199                     f.addListener(new ChannelPromiseNotifier(promise));
200                 }
201             });
202             return promise;
203         }
204     }
205 
206     @Override
207     public void close(final ChannelHandlerContext ctx, final ChannelPromise promise) throws Exception {
208         ChannelFuture f = finishEncode(ctx, ctx.newPromise());
209         f.addListener(new ChannelFutureListener() {
210             @Override
211             public void operationComplete(ChannelFuture f) throws Exception {
212                 ctx.close(promise);
213             }
214         });
215 
216         if (!f.isDone()) {
217             // Ensure the channel is closed even if the write operation completes in time.
218             ctx.executor().schedule(new Runnable() {
219                 @Override
220                 public void run() {
221                     ctx.close(promise);
222                 }
223             }, 10, TimeUnit.SECONDS); // FIXME: Magic number
224         }
225     }
226 
227     private ChannelFuture finishEncode(final ChannelHandlerContext ctx, ChannelPromise promise) {
228         if (finished) {
229             promise.setSuccess();
230             return promise;
231         }
232         finished = true;
233 
234         final ByteBuf footer = ctx.alloc().buffer();
235         closeBlock(footer);
236 
237         final int streamCRC = this.streamCRC;
238         final Bzip2BitWriter writer = this.writer;
239         try {
240             writer.writeBits(footer, 24, END_OF_STREAM_MAGIC_1);
241             writer.writeBits(footer, 24, END_OF_STREAM_MAGIC_2);
242             writer.writeInt(footer, streamCRC);
243             writer.flush(footer);
244         } finally {
245             blockCompressor = null;
246         }
247         return ctx.writeAndFlush(footer, promise);
248     }
249 
250     private ChannelHandlerContext ctx() {
251         ChannelHandlerContext ctx = this.ctx;
252         if (ctx == null) {
253             throw new IllegalStateException("not added to a pipeline");
254         }
255         return ctx;
256     }
257 
258     @Override
259     public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
260         this.ctx = ctx;
261     }
262 }