View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty5.handler.codec.compression;
17  
18  import io.netty5.buffer.api.Buffer;
19  import io.netty5.buffer.api.BufferAllocator;
20  
21  import java.util.function.Supplier;
22  
23  import static io.netty5.handler.codec.compression.Bzip2Constants.BASE_BLOCK_SIZE;
24  import static io.netty5.handler.codec.compression.Bzip2Constants.END_OF_STREAM_MAGIC_1;
25  import static io.netty5.handler.codec.compression.Bzip2Constants.END_OF_STREAM_MAGIC_2;
26  import static io.netty5.handler.codec.compression.Bzip2Constants.MAGIC_NUMBER;
27  import static io.netty5.handler.codec.compression.Bzip2Constants.MAX_BLOCK_SIZE;
28  import static io.netty5.handler.codec.compression.Bzip2Constants.MIN_BLOCK_SIZE;
29  
30  /**
31   * Compresses a {@link Buffer} using the Bzip2 algorithm.
32   *
33   * See <a href="https://en.wikipedia.org/wiki/Bzip2">Bzip2</a>.
34   */
35  public final class Bzip2Compressor implements Compressor {
36  
37      /**
38       * Creates a new bzip2 compressor with the specified {@code blockSizeMultiplier}.
39       * @param blockSizeMultiplier
40       *        The Bzip2 block size as a multiple of 100,000 bytes (minimum {@code 1}, maximum {@code 9}).
41       *        Larger block sizes require more memory for both compression and decompression,
42       *        but give better compression ratios. {@code 9} will usually be the best value to use.
43       */
44      private Bzip2Compressor(final int blockSizeMultiplier) {
45          streamBlockSize = blockSizeMultiplier * BASE_BLOCK_SIZE;
46      }
47  
48      /**
49       * Creates a new bzip2 compressor factory with the maximum (900,000 byte) block size.
50       *
51       * @return the factory.
52       */
53      public static Supplier<Bzip2Compressor> newFactory() {
54          return newFactory(MAX_BLOCK_SIZE);
55      }
56  
57      /**
58       * Creates a new bzip2 compressor factory with the specified {@code blockSizeMultiplier}.
59       *
60       * @param blockSizeMultiplier
61       *        The Bzip2 block size as a multiple of 100,000 bytes (minimum {@code 1}, maximum {@code 9}).
62       *        Larger block sizes require more memory for both compression and decompression,
63       *        but give better compression ratios. {@code 9} will usually be the best value to use.
64       * @return the factory.
65       */
66      public static Supplier<Bzip2Compressor> newFactory(final int blockSizeMultiplier) {
67          if (blockSizeMultiplier < MIN_BLOCK_SIZE || blockSizeMultiplier > MAX_BLOCK_SIZE) {
68              throw new IllegalArgumentException(
69                      "blockSizeMultiplier: " + blockSizeMultiplier + " (expected: 1-9)");
70          }
71          return () -> new Bzip2Compressor(blockSizeMultiplier);
72      }
73  
74      /**
75       * Current state of stream.
76       */
77      private enum State {
78          INIT,
79          INIT_BLOCK,
80          WRITE_DATA,
81          CLOSE_BLOCK
82      }
83  
84      private State currentState = State.INIT;
85  
86      /**
87       * A writer that provides bit-level writes.
88       */
89      private final Bzip2BitWriter writer = new Bzip2BitWriter();
90  
91      /**
92       * The declared maximum block size of the stream (before final run-length decoding).
93       */
94      private final int streamBlockSize;
95  
96      /**
97       * The merged CRC of all blocks compressed so far.
98       */
99      private int streamCRC;
100 
101     /**
102      * The compressor for the current block.
103      */
104     private Bzip2BlockCompressor blockCompressor;
105 
106     private enum CompressorState {
107         PROCESSING,
108         FINISHED,
109         CLOSED
110     }
111 
112     private CompressorState compressorState = CompressorState.PROCESSING;
113 
114     @Override
115     public Buffer compress(Buffer in, BufferAllocator allocator) throws CompressionException {
116         switch (compressorState) {
117             case CLOSED:
118                 throw new CompressionException("Compressor closed");
119             case FINISHED:
120                 return allocator.allocate(0);
121             case PROCESSING:
122                 return compressData(in, allocator);
123             default:
124                 throw new IllegalStateException();
125         }
126     }
127 
128     private Buffer compressData(Buffer in, BufferAllocator allocator) {
129         Buffer out = allocator.allocate(256);
130         for (;;) {
131             switch (currentState) {
132                 case INIT:
133                     out.ensureWritable(4);
134                     out.writeMedium(MAGIC_NUMBER);
135                     out.writeByte((byte) ('0' + streamBlockSize / BASE_BLOCK_SIZE));
136                     currentState = State.INIT_BLOCK;
137                     // fall through
138                 case INIT_BLOCK:
139                     blockCompressor = new Bzip2BlockCompressor(writer, streamBlockSize);
140                     currentState = State.WRITE_DATA;
141                     // fall through
142                 case WRITE_DATA:
143                     if (in.readableBytes() == 0) {
144                         return out;
145                     }
146                     Bzip2BlockCompressor blockCompressor = this.blockCompressor;
147                     final int length = Math.min(in.readableBytes(), blockCompressor.availableSize());
148                     final int bytesWritten = blockCompressor.write(in, in.readerOffset(), length);
149                     in.skipReadableBytes(bytesWritten);
150                     if (!blockCompressor.isFull()) {
151                         if (in.readableBytes() > 0) {
152                             break;
153                         } else {
154                             return out;
155                         }
156                     }
157                     currentState = State.CLOSE_BLOCK;
158                     // fall through
159                 case CLOSE_BLOCK:
160                     closeBlock(out);
161                     currentState = State.INIT_BLOCK;
162                     break;
163                 default:
164                     throw new IllegalStateException();
165             }
166         }
167     }
168 
169     /**
170      * Close current block and update {@link #streamCRC}.
171      */
172     private void closeBlock(Buffer out) {
173         final Bzip2BlockCompressor blockCompressor = this.blockCompressor;
174         if (!blockCompressor.isEmpty()) {
175             blockCompressor.close(out);
176             final int blockCRC = blockCompressor.crc();
177             streamCRC = (streamCRC << 1 | streamCRC >>> 31) ^ blockCRC;
178         }
179     }
180 
181     @Override
182     public Buffer finish(BufferAllocator allocator) {
183         switch (compressorState) {
184             case CLOSED:
185                 throw new CompressionException("Compressor closed");
186             case FINISHED:
187                 return allocator.allocate(0);
188             case PROCESSING:
189                 compressorState = CompressorState.FINISHED;
190                 final Buffer footer = allocator.allocate(256);
191                 try {
192                     closeBlock(footer);
193 
194                     final int streamCRC = this.streamCRC;
195                     final Bzip2BitWriter writer = this.writer;
196                     try {
197                         writer.writeBits(footer, 24, END_OF_STREAM_MAGIC_1);
198                         writer.writeBits(footer, 24, END_OF_STREAM_MAGIC_2);
199                         writer.writeInt(footer, streamCRC);
200                         writer.flush(footer);
201                     } finally {
202                         blockCompressor = null;
203                     }
204                     return footer;
205                 } catch (Throwable cause) {
206                     footer.close();
207                     throw cause;
208                 }
209             default:
210                 throw new IllegalStateException();
211         }
212     }
213 
214     @Override
215     public boolean isFinished() {
216         return compressorState != CompressorState.PROCESSING;
217     }
218 
219     @Override
220     public boolean isClosed() {
221         return compressorState == CompressorState.CLOSED;
222     }
223 
224     @Override
225     public void close() {
226         compressorState = CompressorState.CLOSED;
227     }
228 }