1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.codec.compression;
17
18 import com.aayushatharva.brotli4j.encoder.BrotliEncoderChannel;
19 import com.aayushatharva.brotli4j.encoder.Encoder;
20 import io.netty.buffer.ByteBuf;
21 import io.netty.buffer.Unpooled;
22 import io.netty.channel.Channel;
23 import io.netty.channel.ChannelFuture;
24 import io.netty.channel.ChannelHandler;
25 import io.netty.channel.ChannelHandlerContext;
26 import io.netty.channel.ChannelPromise;
27 import io.netty.handler.codec.MessageToByteEncoder;
28 import io.netty.util.AttributeKey;
29 import io.netty.util.ReferenceCountUtil;
30 import io.netty.util.internal.ObjectUtil;
31
32 import java.io.IOException;
33 import java.nio.ByteBuffer;
34 import java.nio.channels.ClosedChannelException;
35 import java.nio.channels.WritableByteChannel;
36
37
38
39
40
41
42 @ChannelHandler.Sharable
43 public final class BrotliEncoder extends MessageToByteEncoder<ByteBuf> {
44
45 private static final AttributeKey<Writer> ATTR = AttributeKey.valueOf("BrotliEncoderWriter");
46
47 private final Encoder.Parameters parameters;
48 private final boolean isSharable;
49 private Writer writer;
50
51
52
53
54
55 public BrotliEncoder() {
56 this(BrotliOptions.DEFAULT);
57 }
58
59
60
61
62
63
64
65 public BrotliEncoder(BrotliOptions brotliOptions) {
66 this(brotliOptions.parameters());
67 }
68
69
70
71
72
73
74
75 public BrotliEncoder(Encoder.Parameters parameters) {
76 this(parameters, true);
77 }
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95 public BrotliEncoder(Encoder.Parameters parameters, boolean isSharable) {
96 super(ByteBuf.class);
97 this.parameters = ObjectUtil.checkNotNull(parameters, "Parameters");
98 this.isSharable = isSharable;
99 }
100
101 @Override
102 public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
103 Writer writer = new Writer(parameters, ctx);
104 if (isSharable) {
105 ctx.channel().attr(ATTR).set(writer);
106 } else {
107 this.writer = writer;
108 }
109 super.handlerAdded(ctx);
110 }
111
112 @Override
113 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
114 finish(ctx);
115 super.handlerRemoved(ctx);
116 }
117
118 @Override
119 protected void encode(ChannelHandlerContext ctx, ByteBuf msg, ByteBuf out) throws Exception {
120
121 }
122
123 @Override
124 protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, ByteBuf msg, boolean preferDirect) throws Exception {
125 if (!msg.isReadable()) {
126 return Unpooled.EMPTY_BUFFER;
127 }
128
129 Writer writer;
130 if (isSharable) {
131 writer = ctx.channel().attr(ATTR).get();
132 } else {
133 writer = this.writer;
134 }
135
136
137 if (writer == null) {
138 return Unpooled.EMPTY_BUFFER;
139 } else {
140 writer.encode(msg, preferDirect);
141 return writer.writableBuffer;
142 }
143 }
144
145 @Override
146 public boolean isSharable() {
147 return isSharable;
148 }
149
150
151
152
153
154
155
156 public void finish(ChannelHandlerContext ctx) throws IOException {
157 finishEncode(ctx, ctx.newPromise());
158 }
159
160 private ChannelFuture finishEncode(ChannelHandlerContext ctx, ChannelPromise promise) throws IOException {
161 Writer writer;
162
163 if (isSharable) {
164 writer = ctx.channel().attr(ATTR).getAndSet(null);
165 } else {
166 writer = this.writer;
167 }
168
169 if (writer != null) {
170 writer.close();
171 this.writer = null;
172 }
173 return promise;
174 }
175
176 @Override
177 public void close(final ChannelHandlerContext ctx, final ChannelPromise promise) throws Exception {
178 ChannelFuture f = finishEncode(ctx, ctx.newPromise());
179 EncoderUtil.closeAfterFinishEncode(ctx, f, promise);
180 }
181
182
183
184
185
186 private static final class Writer implements WritableByteChannel {
187
188 private ByteBuf writableBuffer;
189 private final BrotliEncoderChannel brotliEncoderChannel;
190 private final ChannelHandlerContext ctx;
191 private boolean isClosed;
192
193 private Writer(Encoder.Parameters parameters, ChannelHandlerContext ctx) throws IOException {
194 brotliEncoderChannel = new BrotliEncoderChannel(this, parameters);
195 this.ctx = ctx;
196 }
197
198 private void encode(ByteBuf msg, boolean preferDirect) throws Exception {
199 try {
200 allocate(preferDirect);
201
202
203
204
205
206
207
208
209 ByteBuffer nioBuffer = CompressionUtil.safeReadableNioBuffer(msg);
210 int position = nioBuffer.position();
211 brotliEncoderChannel.write(nioBuffer);
212 msg.skipBytes(nioBuffer.position() - position);
213 brotliEncoderChannel.flush();
214 } catch (Exception e) {
215 ReferenceCountUtil.release(msg);
216 throw e;
217 }
218 }
219
220 private void allocate(boolean preferDirect) {
221 if (preferDirect) {
222 writableBuffer = ctx.alloc().ioBuffer();
223 } else {
224 writableBuffer = ctx.alloc().buffer();
225 }
226 }
227
228 @Override
229 public int write(ByteBuffer src) throws IOException {
230 if (!isOpen()) {
231 throw new ClosedChannelException();
232 }
233
234 return writableBuffer.writeBytes(src).readableBytes();
235 }
236
237 @Override
238 public boolean isOpen() {
239 return !isClosed;
240 }
241
242 @Override
243 public void close() {
244 final ChannelPromise promise = ctx.newPromise();
245
246 ctx.executor().execute(new Runnable() {
247 @Override
248 public void run() {
249 try {
250 finish(promise);
251 } catch (IOException ex) {
252 promise.setFailure(new IllegalStateException("Failed to finish encoding", ex));
253 }
254 }
255 });
256 }
257
258 public void finish(final ChannelPromise promise) throws IOException {
259 if (!isClosed) {
260
261 allocate(true);
262
263 try {
264 brotliEncoderChannel.close();
265 isClosed = true;
266 } catch (Exception ex) {
267 promise.setFailure(ex);
268
269
270
271 ReferenceCountUtil.release(writableBuffer);
272 return;
273 }
274
275 ctx.writeAndFlush(writableBuffer, promise);
276 }
277 }
278 }
279 }