1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.codec.compression;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.channel.ChannelFuture;
20 import io.netty.channel.ChannelHandlerContext;
21 import io.netty.channel.ChannelPipeline;
22 import io.netty.channel.ChannelPromise;
23 import io.netty.handler.codec.MessageToByteEncoder;
24 import io.netty.util.concurrent.EventExecutor;
25 import io.netty.util.concurrent.PromiseNotifier;
26
27 import static io.netty.handler.codec.compression.Bzip2Constants.BASE_BLOCK_SIZE;
28 import static io.netty.handler.codec.compression.Bzip2Constants.END_OF_STREAM_MAGIC_1;
29 import static io.netty.handler.codec.compression.Bzip2Constants.END_OF_STREAM_MAGIC_2;
30 import static io.netty.handler.codec.compression.Bzip2Constants.MAGIC_NUMBER;
31 import static io.netty.handler.codec.compression.Bzip2Constants.MAX_BLOCK_SIZE;
32 import static io.netty.handler.codec.compression.Bzip2Constants.MIN_BLOCK_SIZE;
33
34
35
36
37
38
39 public class Bzip2Encoder extends MessageToByteEncoder<ByteBuf> {
40
41
42
43 private enum State {
44 INIT,
45 INIT_BLOCK,
46 WRITE_DATA,
47 CLOSE_BLOCK
48 }
49
50 private State currentState = State.INIT;
51
52
53
54
55 private final Bzip2BitWriter writer = new Bzip2BitWriter();
56
57
58
59
60 private final int streamBlockSize;
61
62
63
64
65 private int streamCRC;
66
67
68
69
70 private Bzip2BlockCompressor blockCompressor;
71
72
73
74
75 private volatile boolean finished;
76
77
78
79
80 private volatile ChannelHandlerContext ctx;
81
82
83
84
85 public Bzip2Encoder() {
86 this(MAX_BLOCK_SIZE);
87 }
88
89
90
91
92
93
94
95
96 public Bzip2Encoder(final int blockSizeMultiplier) {
97 super(ByteBuf.class);
98 if (blockSizeMultiplier < MIN_BLOCK_SIZE || blockSizeMultiplier > MAX_BLOCK_SIZE) {
99 throw new IllegalArgumentException(
100 "blockSizeMultiplier: " + blockSizeMultiplier + " (expected: 1-9)");
101 }
102 streamBlockSize = blockSizeMultiplier * BASE_BLOCK_SIZE;
103 }
104
105 @Override
106 protected void encode(ChannelHandlerContext ctx, ByteBuf in, ByteBuf out) throws Exception {
107 if (finished) {
108 out.writeBytes(in);
109 return;
110 }
111
112 for (;;) {
113 switch (currentState) {
114 case INIT:
115 out.ensureWritable(4);
116 out.writeMedium(MAGIC_NUMBER);
117 out.writeByte('0' + streamBlockSize / BASE_BLOCK_SIZE);
118 currentState = State.INIT_BLOCK;
119
120 case INIT_BLOCK:
121 blockCompressor = new Bzip2BlockCompressor(writer, streamBlockSize);
122 currentState = State.WRITE_DATA;
123
124 case WRITE_DATA:
125 if (!in.isReadable()) {
126 return;
127 }
128 Bzip2BlockCompressor blockCompressor = this.blockCompressor;
129 final int length = Math.min(in.readableBytes(), blockCompressor.availableSize());
130 final int bytesWritten = blockCompressor.write(in, in.readerIndex(), length);
131 in.skipBytes(bytesWritten);
132 if (!blockCompressor.isFull()) {
133 if (in.isReadable()) {
134 break;
135 } else {
136 return;
137 }
138 }
139 currentState = State.CLOSE_BLOCK;
140
141 case CLOSE_BLOCK:
142 closeBlock(out);
143 currentState = State.INIT_BLOCK;
144 break;
145 default:
146 throw new IllegalStateException();
147 }
148 }
149 }
150
151
152
153
154 private void closeBlock(ByteBuf out) {
155 final Bzip2BlockCompressor blockCompressor = this.blockCompressor;
156 if (!blockCompressor.isEmpty()) {
157 blockCompressor.close(out);
158 final int blockCRC = blockCompressor.crc();
159 streamCRC = (streamCRC << 1 | streamCRC >>> 31) ^ blockCRC;
160 }
161 }
162
163
164
165
166 public boolean isClosed() {
167 return finished;
168 }
169
170
171
172
173
174
175 public ChannelFuture close() {
176 return close(ctx().newPromise());
177 }
178
179
180
181
182
183
184 public ChannelFuture close(final ChannelPromise promise) {
185 ChannelHandlerContext ctx = ctx();
186 EventExecutor executor = ctx.executor();
187 if (executor.inEventLoop()) {
188 return finishEncode(ctx, promise);
189 } else {
190 executor.execute(new Runnable() {
191 @Override
192 public void run() {
193 ChannelFuture f = finishEncode(ctx(), promise);
194 PromiseNotifier.cascade(f, promise);
195 }
196 });
197 return promise;
198 }
199 }
200
201 @Override
202 public void close(final ChannelHandlerContext ctx, final ChannelPromise promise) throws Exception {
203 ChannelFuture f = finishEncode(ctx, ctx.newPromise());
204 EncoderUtil.closeAfterFinishEncode(ctx, f, promise);
205 }
206
207 private ChannelFuture finishEncode(final ChannelHandlerContext ctx, ChannelPromise promise) {
208 if (finished) {
209 promise.setSuccess();
210 return promise;
211 }
212 finished = true;
213
214 final ByteBuf footer = ctx.alloc().buffer();
215 closeBlock(footer);
216
217 final int streamCRC = this.streamCRC;
218 final Bzip2BitWriter writer = this.writer;
219 try {
220 writer.writeBits(footer, 24, END_OF_STREAM_MAGIC_1);
221 writer.writeBits(footer, 24, END_OF_STREAM_MAGIC_2);
222 writer.writeInt(footer, streamCRC);
223 writer.flush(footer);
224 } finally {
225 blockCompressor = null;
226 }
227 return ctx.writeAndFlush(footer, promise);
228 }
229
230 private ChannelHandlerContext ctx() {
231 ChannelHandlerContext ctx = this.ctx;
232 if (ctx == null) {
233 throw new IllegalStateException("not added to a pipeline");
234 }
235 return ctx;
236 }
237
238 @Override
239 public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
240 this.ctx = ctx;
241 }
242 }