1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package io.netty.handler.codec.compression;
18
19 import com.aayushatharva.brotli4j.decoder.DecoderJNI;
20 import io.netty.buffer.ByteBuf;
21 import io.netty.channel.ChannelHandlerContext;
22 import io.netty.handler.codec.ByteToMessageDecoder;
23 import io.netty.util.internal.ObjectUtil;
24
25 import java.nio.ByteBuffer;
26 import java.util.List;
27
28
29
30
31
32
33 public final class BrotliDecoder extends ByteToMessageDecoder {
34
35 private static final int DEFAULT_MAX_FORWARD_BYTES = CompressionUtil.DEFAULT_MAX_FORWARD_BYTES;
36 private static final int DEFAULT_INPUT_BUFFER_SIZE = 8 * 1024;
37
38 private enum State {
39 DONE, NEEDS_MORE_INPUT, ERROR
40 }
41
42 static {
43 try {
44 Brotli.ensureAvailability();
45 } catch (Throwable throwable) {
46 throw new ExceptionInInitializerError(throwable);
47 }
48 }
49
50 private final int inputBufferSize;
51 private final int outputBufferSize;
52 private DecoderJNI.Wrapper decoder;
53 private boolean destroyed;
54 private boolean needsRead;
55 private ByteBuf accumBuffer;
56
57
58
59
60 public BrotliDecoder() {
61 this(DEFAULT_INPUT_BUFFER_SIZE);
62 }
63
64
65
66
67
68 public BrotliDecoder(int inputBufferSize) {
69 this(inputBufferSize == 0 ? DEFAULT_INPUT_BUFFER_SIZE : inputBufferSize, DEFAULT_MAX_FORWARD_BYTES);
70 }
71
72
73
74
75
76
77
78 public BrotliDecoder(int inputBufferSize, int outputBufferSize) {
79 this.inputBufferSize = ObjectUtil.checkPositive(inputBufferSize, "inputBufferSize");
80 this.outputBufferSize = ObjectUtil.checkPositive(outputBufferSize, "outputBufferSize");
81 }
82
83 private void forwardOutput(ChannelHandlerContext ctx) {
84 ByteBuffer nativeBuffer = decoder.pull(outputBufferSize);
85
86 int remaining = nativeBuffer.remaining();
87 if (accumBuffer == null) {
88 accumBuffer = ctx.alloc().buffer(remaining);
89 }
90 accumBuffer.writeBytes(nativeBuffer);
91 needsRead = false;
92 if (accumBuffer.readableBytes() >= outputBufferSize) {
93 ctx.fireChannelRead(accumBuffer);
94 accumBuffer = null;
95 }
96 }
97
98 private void flushAccumBuffer(ChannelHandlerContext ctx) {
99 if (accumBuffer != null && accumBuffer.isReadable()) {
100 ctx.fireChannelRead(accumBuffer);
101 } else if (accumBuffer != null) {
102 accumBuffer.release();
103 }
104 accumBuffer = null;
105 }
106
107 private State decompress(ChannelHandlerContext ctx, ByteBuf input) {
108 for (;;) {
109 switch (decoder.getStatus()) {
110 case DONE:
111 return State.DONE;
112
113 case OK:
114 decoder.push(0);
115 break;
116
117 case NEEDS_MORE_INPUT:
118 while (decoder.hasOutput()) {
119 forwardOutput(ctx);
120 }
121
122 if (!input.isReadable()) {
123 return State.NEEDS_MORE_INPUT;
124 }
125
126 ByteBuffer decoderInputBuffer = decoder.getInputBuffer();
127 decoderInputBuffer.clear();
128 int readBytes = readBytes(input, decoderInputBuffer);
129 decoder.push(readBytes);
130 break;
131
132 case NEEDS_MORE_OUTPUT:
133 forwardOutput(ctx);
134 break;
135
136 default:
137 return State.ERROR;
138 }
139 }
140 }
141
142 private static int readBytes(ByteBuf in, ByteBuffer dest) {
143 int limit = Math.min(in.readableBytes(), dest.remaining());
144 ByteBuffer slice = dest.slice();
145 slice.limit(limit);
146 in.readBytes(slice);
147 dest.position(dest.position() + limit);
148 return limit;
149 }
150
151 @Override
152 public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
153 decoder = new DecoderJNI.Wrapper(inputBufferSize);
154 }
155
156 @Override
157 protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
158 needsRead = true;
159 if (destroyed) {
160
161 in.skipBytes(in.readableBytes());
162 return;
163 }
164
165 if (!in.isReadable()) {
166 return;
167 }
168
169 try {
170 State state = decompress(ctx, in);
171 if (state == State.DONE) {
172 destroy();
173 } else if (state == State.ERROR) {
174 throw new DecompressionException("Brotli stream corrupted");
175 }
176 } catch (Exception e) {
177 destroy();
178 throw e;
179 } finally {
180 flushAccumBuffer(ctx);
181 }
182 }
183
184 private void destroy() {
185 if (!destroyed) {
186 destroyed = true;
187 decoder.destroy();
188 }
189 }
190
191 @Override
192 protected void handlerRemoved0(ChannelHandlerContext ctx) throws Exception {
193 try {
194 destroy();
195 } finally {
196 super.handlerRemoved0(ctx);
197 }
198 }
199
200 @Override
201 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
202 try {
203 destroy();
204 } finally {
205 super.channelInactive(ctx);
206 }
207 }
208
209 @Override
210 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
211
212 discardSomeReadBytes();
213
214 if (needsRead && !ctx.channel().config().isAutoRead()) {
215 ctx.read();
216 }
217 ctx.fireChannelReadComplete();
218 }
219 }