1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package io.netty.handler.codec.compression;
18
19 import com.aayushatharva.brotli4j.decoder.DecoderJNI;
20 import io.netty.buffer.ByteBuf;
21 import io.netty.channel.ChannelHandlerContext;
22 import io.netty.handler.codec.ByteToMessageDecoder;
23 import io.netty.util.internal.ObjectUtil;
24
25 import java.nio.ByteBuffer;
26 import java.util.List;
27
28
29
30
31
32
33 public final class BrotliDecoder extends ByteToMessageDecoder {
34
35 private static final int DEFAULT_MAX_FORWARD_BYTES = CompressionUtil.DEFAULT_MAX_FORWARD_BYTES;
36
37 private enum State {
38 DONE, NEEDS_MORE_INPUT, ERROR
39 }
40
41 static {
42 try {
43 Brotli.ensureAvailability();
44 } catch (Throwable throwable) {
45 throw new ExceptionInInitializerError(throwable);
46 }
47 }
48
49 private final int inputBufferSize;
50 private DecoderJNI.Wrapper decoder;
51 private boolean destroyed;
52 private boolean needsRead;
53 private ByteBuf accumBuffer;
54
55
56
57
58 public BrotliDecoder() {
59 this(8 * 1024);
60 }
61
62
63
64
65
66 public BrotliDecoder(int inputBufferSize) {
67 this.inputBufferSize = ObjectUtil.checkPositive(inputBufferSize, "inputBufferSize");
68 }
69
70 private void forwardOutput(ChannelHandlerContext ctx) {
71 ByteBuffer nativeBuffer = decoder.pull();
72
73 int remaining = nativeBuffer.remaining();
74 if (accumBuffer == null) {
75 accumBuffer = ctx.alloc().buffer(remaining);
76 }
77 accumBuffer.writeBytes(nativeBuffer);
78 needsRead = false;
79 if (accumBuffer.readableBytes() >= DEFAULT_MAX_FORWARD_BYTES) {
80 ctx.fireChannelRead(accumBuffer);
81 accumBuffer = null;
82 }
83 }
84
85 private void flushAccumBuffer(ChannelHandlerContext ctx) {
86 if (accumBuffer != null && accumBuffer.isReadable()) {
87 ctx.fireChannelRead(accumBuffer);
88 } else if (accumBuffer != null) {
89 accumBuffer.release();
90 }
91 accumBuffer = null;
92 }
93
94 private State decompress(ChannelHandlerContext ctx, ByteBuf input) {
95 for (;;) {
96 switch (decoder.getStatus()) {
97 case DONE:
98 return State.DONE;
99
100 case OK:
101 decoder.push(0);
102 break;
103
104 case NEEDS_MORE_INPUT:
105 if (decoder.hasOutput()) {
106 forwardOutput(ctx);
107 }
108
109 if (!input.isReadable()) {
110 return State.NEEDS_MORE_INPUT;
111 }
112
113 ByteBuffer decoderInputBuffer = decoder.getInputBuffer();
114 decoderInputBuffer.clear();
115 int readBytes = readBytes(input, decoderInputBuffer);
116 decoder.push(readBytes);
117 break;
118
119 case NEEDS_MORE_OUTPUT:
120 forwardOutput(ctx);
121 break;
122
123 default:
124 return State.ERROR;
125 }
126 }
127 }
128
129 private static int readBytes(ByteBuf in, ByteBuffer dest) {
130 int limit = Math.min(in.readableBytes(), dest.remaining());
131 ByteBuffer slice = dest.slice();
132 slice.limit(limit);
133 in.readBytes(slice);
134 dest.position(dest.position() + limit);
135 return limit;
136 }
137
138 @Override
139 public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
140 decoder = new DecoderJNI.Wrapper(inputBufferSize);
141 }
142
143 @Override
144 protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
145 needsRead = true;
146 if (destroyed) {
147
148 in.skipBytes(in.readableBytes());
149 return;
150 }
151
152 if (!in.isReadable()) {
153 return;
154 }
155
156 try {
157 State state = decompress(ctx, in);
158 if (state == State.DONE) {
159 destroy();
160 } else if (state == State.ERROR) {
161 throw new DecompressionException("Brotli stream corrupted");
162 }
163 } catch (Exception e) {
164 destroy();
165 throw e;
166 } finally {
167 flushAccumBuffer(ctx);
168 }
169 }
170
171 private void destroy() {
172 if (!destroyed) {
173 destroyed = true;
174 decoder.destroy();
175 }
176 }
177
178 @Override
179 protected void handlerRemoved0(ChannelHandlerContext ctx) throws Exception {
180 try {
181 destroy();
182 } finally {
183 super.handlerRemoved0(ctx);
184 }
185 }
186
187 @Override
188 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
189 try {
190 destroy();
191 } finally {
192 super.channelInactive(ctx);
193 }
194 }
195
196 @Override
197 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
198
199 discardSomeReadBytes();
200
201 if (needsRead && !ctx.channel().config().isAutoRead()) {
202 ctx.read();
203 }
204 ctx.fireChannelReadComplete();
205 }
206 }