View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.codec.compression;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.channel.ChannelFuture;
20  import io.netty.channel.ChannelHandlerContext;
21  import io.netty.channel.ChannelPipeline;
22  import io.netty.channel.ChannelPromise;
23  import io.netty.handler.codec.MessageToByteEncoder;
24  import io.netty.util.concurrent.EventExecutor;
25  import io.netty.util.concurrent.PromiseNotifier;
26  
27  import static io.netty.handler.codec.compression.Bzip2Constants.BASE_BLOCK_SIZE;
28  import static io.netty.handler.codec.compression.Bzip2Constants.END_OF_STREAM_MAGIC_1;
29  import static io.netty.handler.codec.compression.Bzip2Constants.END_OF_STREAM_MAGIC_2;
30  import static io.netty.handler.codec.compression.Bzip2Constants.MAGIC_NUMBER;
31  import static io.netty.handler.codec.compression.Bzip2Constants.MAX_BLOCK_SIZE;
32  import static io.netty.handler.codec.compression.Bzip2Constants.MIN_BLOCK_SIZE;
33  
34  /**
35   * Compresses a {@link ByteBuf} using the Bzip2 algorithm.
36   *
37   * See <a href="https://en.wikipedia.org/wiki/Bzip2">Bzip2</a>.
38   */
39  public class Bzip2Encoder extends MessageToByteEncoder<ByteBuf> {
40      /**
41       * Current state of stream.
42       */
43      private enum State {
44          INIT,
45          INIT_BLOCK,
46          WRITE_DATA,
47          CLOSE_BLOCK
48      }
49  
50      private State currentState = State.INIT;
51  
52      /**
53       * A writer that provides bit-level writes.
54       */
55      private final Bzip2BitWriter writer = new Bzip2BitWriter();
56  
57      /**
58       * The declared maximum block size of the stream (before final run-length decoding).
59       */
60      private final int streamBlockSize;
61  
62      /**
63       * The merged CRC of all blocks compressed so far.
64       */
65      private int streamCRC;
66  
67      /**
68       * The compressor for the current block.
69       */
70      private Bzip2BlockCompressor blockCompressor;
71  
72      /**
73       * (@code true} if the compressed stream has been finished, otherwise {@code false}.
74       */
75      private volatile boolean finished;
76  
77      /**
78       * Used to interact with its {@link ChannelPipeline} and other handlers.
79       */
80      private volatile ChannelHandlerContext ctx;
81  
82      /**
83       * Creates a new bzip2 encoder with the maximum (900,000 byte) block size.
84       */
85      public Bzip2Encoder() {
86          this(MAX_BLOCK_SIZE);
87      }
88  
89      /**
90       * Creates a new bzip2 encoder with the specified {@code blockSizeMultiplier}.
91       * @param blockSizeMultiplier
92       *        The Bzip2 block size as a multiple of 100,000 bytes (minimum {@code 1}, maximum {@code 9}).
93       *        Larger block sizes require more memory for both compression and decompression,
94       *        but give better compression ratios. {@code 9} will usually be the best value to use.
95       */
96      public Bzip2Encoder(final int blockSizeMultiplier) {
97          super(ByteBuf.class);
98          if (blockSizeMultiplier < MIN_BLOCK_SIZE || blockSizeMultiplier > MAX_BLOCK_SIZE) {
99              throw new IllegalArgumentException(
100                     "blockSizeMultiplier: " + blockSizeMultiplier + " (expected: 1-9)");
101         }
102         streamBlockSize = blockSizeMultiplier * BASE_BLOCK_SIZE;
103     }
104 
105     @Override
106     protected void encode(ChannelHandlerContext ctx, ByteBuf in, ByteBuf out) throws Exception {
107         if (finished) {
108             out.writeBytes(in);
109             return;
110         }
111 
112         for (;;) {
113             switch (currentState) {
114                 case INIT:
115                     out.ensureWritable(4);
116                     out.writeMedium(MAGIC_NUMBER);
117                     out.writeByte('0' + streamBlockSize / BASE_BLOCK_SIZE);
118                     currentState = State.INIT_BLOCK;
119                     // fall through
120                 case INIT_BLOCK:
121                     blockCompressor = new Bzip2BlockCompressor(writer, streamBlockSize);
122                     currentState = State.WRITE_DATA;
123                     // fall through
124                 case WRITE_DATA:
125                     if (!in.isReadable()) {
126                         return;
127                     }
128                     Bzip2BlockCompressor blockCompressor = this.blockCompressor;
129                     final int length = Math.min(in.readableBytes(), blockCompressor.availableSize());
130                     final int bytesWritten = blockCompressor.write(in, in.readerIndex(), length);
131                     in.skipBytes(bytesWritten);
132                     if (!blockCompressor.isFull()) {
133                         if (in.isReadable()) {
134                             break;
135                         } else {
136                             return;
137                         }
138                     }
139                     currentState = State.CLOSE_BLOCK;
140                     // fall through
141                 case CLOSE_BLOCK:
142                     closeBlock(out);
143                     currentState = State.INIT_BLOCK;
144                     break;
145                 default:
146                     throw new IllegalStateException();
147             }
148         }
149     }
150 
151     /**
152      * Close current block and update {@link #streamCRC}.
153      */
154     private void closeBlock(ByteBuf out) {
155         final Bzip2BlockCompressor blockCompressor = this.blockCompressor;
156         if (!blockCompressor.isEmpty()) {
157             blockCompressor.close(out);
158             final int blockCRC = blockCompressor.crc();
159             streamCRC = (streamCRC << 1 | streamCRC >>> 31) ^ blockCRC;
160         }
161     }
162 
163     /**
164      * Returns {@code true} if and only if the end of the compressed stream has been reached.
165      */
166     public boolean isClosed() {
167         return finished;
168     }
169 
170     /**
171      * Close this {@link Bzip2Encoder} and so finish the encoding.
172      *
173      * The returned {@link ChannelFuture} will be notified once the operation completes.
174      */
175     public ChannelFuture close() {
176         return close(ctx().newPromise());
177     }
178 
179     /**
180      * Close this {@link Bzip2Encoder} and so finish the encoding.
181      * The given {@link ChannelFuture} will be notified once the operation
182      * completes and will also be returned.
183      */
184     public ChannelFuture close(final ChannelPromise promise) {
185         ChannelHandlerContext ctx = ctx();
186         EventExecutor executor = ctx.executor();
187         if (executor.inEventLoop()) {
188             return finishEncode(ctx, promise);
189         } else {
190             executor.execute(new Runnable() {
191                 @Override
192                 public void run() {
193                     ChannelFuture f = finishEncode(ctx(), promise);
194                     PromiseNotifier.cascade(f, promise);
195                 }
196             });
197             return promise;
198         }
199     }
200 
201     @Override
202     public void close(final ChannelHandlerContext ctx, final ChannelPromise promise) throws Exception {
203         ChannelFuture f = finishEncode(ctx, ctx.newPromise());
204         EncoderUtil.closeAfterFinishEncode(ctx, f, promise);
205     }
206 
207     private ChannelFuture finishEncode(final ChannelHandlerContext ctx, ChannelPromise promise) {
208         if (finished) {
209             promise.setSuccess();
210             return promise;
211         }
212         finished = true;
213 
214         final ByteBuf footer = ctx.alloc().buffer();
215         closeBlock(footer);
216 
217         final int streamCRC = this.streamCRC;
218         final Bzip2BitWriter writer = this.writer;
219         try {
220             writer.writeBits(footer, 24, END_OF_STREAM_MAGIC_1);
221             writer.writeBits(footer, 24, END_OF_STREAM_MAGIC_2);
222             writer.writeInt(footer, streamCRC);
223             writer.flush(footer);
224         } finally {
225             blockCompressor = null;
226         }
227         return ctx.writeAndFlush(footer, promise);
228     }
229 
230     private ChannelHandlerContext ctx() {
231         ChannelHandlerContext ctx = this.ctx;
232         if (ctx == null) {
233             throw new IllegalStateException("not added to a pipeline");
234         }
235         return ctx;
236     }
237 
238     @Override
239     public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
240         this.ctx = ctx;
241     }
242 }