View Javadoc
1   /*
2    * Copyright 2021 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.codec.compression;
17  
18  import com.aayushatharva.brotli4j.encoder.BrotliEncoderChannel;
19  import com.aayushatharva.brotli4j.encoder.Encoder;
20  import io.netty.buffer.ByteBuf;
21  import io.netty.buffer.Unpooled;
22  import io.netty.channel.Channel;
23  import io.netty.channel.ChannelFuture;
24  import io.netty.channel.ChannelHandler;
25  import io.netty.channel.ChannelHandlerContext;
26  import io.netty.channel.ChannelPromise;
27  import io.netty.handler.codec.MessageToByteEncoder;
28  import io.netty.util.AttributeKey;
29  import io.netty.util.ReferenceCountUtil;
30  import io.netty.util.internal.ObjectUtil;
31  
32  import java.io.IOException;
33  import java.nio.ByteBuffer;
34  import java.nio.channels.ClosedChannelException;
35  import java.nio.channels.WritableByteChannel;
36  
37  /**
38   * Compress a {@link ByteBuf} with the Brotli compression.
39   * <p>
40   * See <a href="https://github.com/google/brotli">brotli</a>.
41   */
42  @ChannelHandler.Sharable
43  public final class BrotliEncoder extends MessageToByteEncoder<ByteBuf> {
44  
45      private static final AttributeKey<Writer> ATTR = AttributeKey.valueOf("BrotliEncoderWriter");
46  
47      private final Encoder.Parameters parameters;
48      private final boolean isSharable;
49      private Writer writer;
50  
51      /**
52       * Create a new {@link BrotliEncoder} Instance with {@link BrotliOptions#DEFAULT}
53       * and {@link #isSharable()} set to {@code true}
54       */
55      public BrotliEncoder() {
56          this(BrotliOptions.DEFAULT);
57      }
58  
59      /**
60       * Create a new {@link BrotliEncoder} Instance
61       *
62       * @param brotliOptions {@link BrotliOptions} to use and
63       *                      {@link #isSharable()} set to {@code true}
64       */
65      public BrotliEncoder(BrotliOptions brotliOptions) {
66          this(brotliOptions.parameters());
67      }
68  
69      /**
70       * Create a new {@link BrotliEncoder} Instance
71       * and {@link #isSharable()} set to {@code true}
72       *
73       * @param parameters {@link Encoder.Parameters} to use
74       */
75      public BrotliEncoder(Encoder.Parameters parameters) {
76          this(parameters, true);
77      }
78  
79      /**
80       * <p>
81       * Create a new {@link BrotliEncoder} Instance and specify
82       * whether this instance will be shared with multiple pipelines or not.
83       * </p>
84       *
85       * If {@link #isSharable()} is true then on {@link #handlerAdded(ChannelHandlerContext)} call,
86       * a new {@link Writer} will create, and it will be mapped using {@link Channel#attr(AttributeKey)}
87       * so {@link BrotliEncoder} can be shared with multiple pipelines. This works fine but there on every
88       * {@link #encode(ChannelHandlerContext, ByteBuf, ByteBuf)} call, we have to get the {@link Writer} associated
89       * with the appropriate channel. And this will add a overhead. So it is recommended to set {@link #isSharable()}
90       * to {@code false} and create new {@link BrotliEncoder} instance for every pipeline.
91       *
92       * @param parameters {@link Encoder.Parameters} to use
93       * @param isSharable Set to {@code true} if this instance is shared else set to {@code false}
94       */
95      public BrotliEncoder(Encoder.Parameters parameters, boolean isSharable) {
96          this.parameters = ObjectUtil.checkNotNull(parameters, "Parameters");
97          this.isSharable = isSharable;
98      }
99  
100     @Override
101     public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
102         Writer writer = new Writer(parameters, ctx);
103         if (isSharable) {
104             ctx.channel().attr(ATTR).set(writer);
105         } else {
106             this.writer = writer;
107         }
108         super.handlerAdded(ctx);
109     }
110 
111     @Override
112     public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
113         finish(ctx);
114         super.handlerRemoved(ctx);
115     }
116 
117     @Override
118     protected void encode(ChannelHandlerContext ctx, ByteBuf msg, ByteBuf out) throws Exception {
119         // NO-OP
120     }
121 
122     @Override
123     protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, ByteBuf msg, boolean preferDirect) throws Exception {
124         if (!msg.isReadable()) {
125             return Unpooled.EMPTY_BUFFER;
126         }
127 
128         Writer writer;
129         if (isSharable) {
130             writer = ctx.channel().attr(ATTR).get();
131         } else {
132             writer = this.writer;
133         }
134 
135         // If Writer is 'null' then Writer is not open.
136         if (writer == null) {
137             return Unpooled.EMPTY_BUFFER;
138         } else {
139             writer.encode(msg, preferDirect);
140             return writer.writableBuffer;
141         }
142     }
143 
144     @Override
145     public boolean isSharable() {
146         return isSharable;
147     }
148 
149     /**
150      * Finish the encoding, close streams and write final {@link ByteBuf} to the channel.
151      *
152      * @param ctx {@link ChannelHandlerContext} which we want to close
153      * @throws IOException If an error occurred during closure
154      */
155     public void finish(ChannelHandlerContext ctx) throws IOException {
156         finishEncode(ctx, ctx.newPromise());
157     }
158 
159     private ChannelFuture finishEncode(ChannelHandlerContext ctx, ChannelPromise promise) throws IOException {
160         Writer writer;
161 
162         if (isSharable) {
163             writer = ctx.channel().attr(ATTR).getAndSet(null);
164         } else {
165             writer = this.writer;
166         }
167 
168         if (writer != null) {
169             writer.close();
170             this.writer = null;
171         }
172         return promise;
173     }
174 
175     @Override
176     public void close(final ChannelHandlerContext ctx, final ChannelPromise promise) throws Exception {
177         ChannelFuture f = finishEncode(ctx, ctx.newPromise());
178         EncoderUtil.closeAfterFinishEncode(ctx, f, promise);
179     }
180 
181     /**
182      * {@link Writer} is the implementation of {@link WritableByteChannel} which encodes
183      * Brotli data and stores it into {@link ByteBuf}.
184      */
185     private static final class Writer implements WritableByteChannel {
186 
187         private ByteBuf writableBuffer;
188         private final BrotliEncoderChannel brotliEncoderChannel;
189         private final ChannelHandlerContext ctx;
190         private boolean isClosed;
191 
192         private Writer(Encoder.Parameters parameters, ChannelHandlerContext ctx) throws IOException {
193             brotliEncoderChannel = new BrotliEncoderChannel(this, parameters);
194             this.ctx = ctx;
195         }
196 
197         private void encode(ByteBuf msg, boolean preferDirect) throws Exception {
198             try {
199                 allocate(preferDirect);
200 
201                 // Compress data and flush it into Buffer.
202                 //
203                 // As soon as we call flush, Encoder will be triggered to write encoded
204                 // data into WritableByteChannel.
205                 //
206                 // A race condition will not arise because one flush call to encoder will result
207                 // in only 1 call at `write(ByteBuffer)`.
208                 ByteBuffer nioBuffer = CompressionUtil.safeReadableNioBuffer(msg);
209                 int position = nioBuffer.position();
210                 brotliEncoderChannel.write(nioBuffer);
211                 msg.skipBytes(nioBuffer.position() - position);
212                 brotliEncoderChannel.flush();
213             } catch (Exception e) {
214                 ReferenceCountUtil.release(msg);
215                 throw e;
216             }
217         }
218 
219         private void allocate(boolean preferDirect) {
220             if (preferDirect) {
221                 writableBuffer = ctx.alloc().ioBuffer();
222             } else {
223                 writableBuffer = ctx.alloc().buffer();
224             }
225         }
226 
227         @Override
228         public int write(ByteBuffer src) throws IOException {
229             if (!isOpen()) {
230                 throw new ClosedChannelException();
231             }
232 
233             return writableBuffer.writeBytes(src).readableBytes();
234         }
235 
236         @Override
237         public boolean isOpen() {
238             return !isClosed;
239         }
240 
241         @Override
242         public void close() {
243             final ChannelPromise promise = ctx.newPromise();
244 
245             ctx.executor().execute(new Runnable() {
246                 @Override
247                 public void run() {
248                     try {
249                         finish(promise);
250                     } catch (IOException ex) {
251                         promise.setFailure(new IllegalStateException("Failed to finish encoding", ex));
252                     }
253                 }
254             });
255         }
256 
257         public void finish(final ChannelPromise promise) throws IOException {
258             if (!isClosed) {
259                 // Allocate a buffer and write last pending data.
260                 allocate(true);
261 
262                 try {
263                     brotliEncoderChannel.close();
264                     isClosed = true;
265                 } catch (Exception ex) {
266                     promise.setFailure(ex);
267 
268                     // Since we have already allocated Buffer for close operation,
269                     // we will release that buffer to prevent memory leak.
270                     ReferenceCountUtil.release(writableBuffer);
271                     return;
272                 }
273 
274                 ctx.writeAndFlush(writableBuffer, promise);
275             }
276         }
277     }
278 }