View Javadoc
1   /*
2    * Copyright 2021 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.codec.compression;
17  
18  import com.aayushatharva.brotli4j.encoder.BrotliEncoderChannel;
19  import com.aayushatharva.brotli4j.encoder.Encoder;
20  import io.netty.buffer.ByteBuf;
21  import io.netty.buffer.Unpooled;
22  import io.netty.channel.Channel;
23  import io.netty.channel.ChannelFuture;
24  import io.netty.channel.ChannelHandler;
25  import io.netty.channel.ChannelHandlerContext;
26  import io.netty.channel.ChannelPromise;
27  import io.netty.handler.codec.MessageToByteEncoder;
28  import io.netty.util.AttributeKey;
29  import io.netty.util.ReferenceCountUtil;
30  import io.netty.util.internal.ObjectUtil;
31  
32  import java.io.IOException;
33  import java.nio.ByteBuffer;
34  import java.nio.channels.ClosedChannelException;
35  import java.nio.channels.WritableByteChannel;
36  
37  /**
38   * Compress a {@link ByteBuf} with the Brotli compression.
39   * <p>
40   * See <a href="https://github.com/google/brotli">brotli</a>.
41   */
42  @ChannelHandler.Sharable
43  public final class BrotliEncoder extends MessageToByteEncoder<ByteBuf> {
44  
45      private static final AttributeKey<Writer> ATTR = AttributeKey.valueOf("BrotliEncoderWriter");
46  
47      private final Encoder.Parameters parameters;
48      private final boolean isSharable;
49      private Writer writer;
50  
51      /**
52       * Create a new {@link BrotliEncoder} Instance with {@link BrotliOptions#DEFAULT}
53       * and {@link #isSharable()} set to {@code true}
54       */
55      public BrotliEncoder() {
56          this(BrotliOptions.DEFAULT);
57      }
58  
59      /**
60       * Create a new {@link BrotliEncoder} Instance
61       *
62       * @param brotliOptions {@link BrotliOptions} to use and
63       *                      {@link #isSharable()} set to {@code true}
64       */
65      public BrotliEncoder(BrotliOptions brotliOptions) {
66          this(brotliOptions.parameters());
67      }
68  
69      /**
70       * Create a new {@link BrotliEncoder} Instance
71       * and {@link #isSharable()} set to {@code true}
72       *
73       * @param parameters {@link Encoder.Parameters} to use
74       */
75      public BrotliEncoder(Encoder.Parameters parameters) {
76          this(parameters, true);
77      }
78  
79      /**
80       * <p>
81       * Create a new {@link BrotliEncoder} Instance and specify
82       * whether this instance will be shared with multiple pipelines or not.
83       * </p>
84       *
85       * If {@link #isSharable()} is true then on {@link #handlerAdded(ChannelHandlerContext)} call,
86       * a new {@link Writer} will create, and it will be mapped using {@link Channel#attr(AttributeKey)}
87       * so {@link BrotliEncoder} can be shared with multiple pipelines. This works fine but there on every
88       * {@link #encode(ChannelHandlerContext, ByteBuf, ByteBuf)} call, we have to get the {@link Writer} associated
89       * with the appropriate channel. And this will add a overhead. So it is recommended to set {@link #isSharable()}
90       * to {@code false} and create new {@link BrotliEncoder} instance for every pipeline.
91       *
92       * @param parameters {@link Encoder.Parameters} to use
93       * @param isSharable Set to {@code true} if this instance is shared else set to {@code false}
94       */
95      public BrotliEncoder(Encoder.Parameters parameters, boolean isSharable) {
96          super(ByteBuf.class);
97          this.parameters = ObjectUtil.checkNotNull(parameters, "Parameters");
98          this.isSharable = isSharable;
99      }
100 
101     @Override
102     public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
103         Writer writer = new Writer(parameters, ctx);
104         if (isSharable) {
105             ctx.channel().attr(ATTR).set(writer);
106         } else {
107             this.writer = writer;
108         }
109         super.handlerAdded(ctx);
110     }
111 
112     @Override
113     public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
114         finish(ctx);
115         super.handlerRemoved(ctx);
116     }
117 
118     @Override
119     protected void encode(ChannelHandlerContext ctx, ByteBuf msg, ByteBuf out) throws Exception {
120         // NO-OP
121     }
122 
123     @Override
124     protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, ByteBuf msg, boolean preferDirect) throws Exception {
125         if (!msg.isReadable()) {
126             return Unpooled.EMPTY_BUFFER;
127         }
128 
129         Writer writer;
130         if (isSharable) {
131             writer = ctx.channel().attr(ATTR).get();
132         } else {
133             writer = this.writer;
134         }
135 
136         // If Writer is 'null' then Writer is not open.
137         if (writer == null) {
138             return Unpooled.EMPTY_BUFFER;
139         } else {
140             writer.encode(msg, preferDirect);
141             return writer.writableBuffer;
142         }
143     }
144 
145     @Override
146     public boolean isSharable() {
147         return isSharable;
148     }
149 
150     /**
151      * Finish the encoding, close streams and write final {@link ByteBuf} to the channel.
152      *
153      * @param ctx {@link ChannelHandlerContext} which we want to close
154      * @throws IOException If an error occurred during closure
155      */
156     public void finish(ChannelHandlerContext ctx) throws IOException {
157         finishEncode(ctx, ctx.newPromise());
158     }
159 
160     private ChannelFuture finishEncode(ChannelHandlerContext ctx, ChannelPromise promise) throws IOException {
161         Writer writer;
162 
163         if (isSharable) {
164             writer = ctx.channel().attr(ATTR).getAndSet(null);
165         } else {
166             writer = this.writer;
167         }
168 
169         if (writer != null) {
170             writer.close();
171             this.writer = null;
172         }
173         return promise;
174     }
175 
176     @Override
177     public void close(final ChannelHandlerContext ctx, final ChannelPromise promise) throws Exception {
178         ChannelFuture f = finishEncode(ctx, ctx.newPromise());
179         EncoderUtil.closeAfterFinishEncode(ctx, f, promise);
180     }
181 
182     /**
183      * {@link Writer} is the implementation of {@link WritableByteChannel} which encodes
184      * Brotli data and stores it into {@link ByteBuf}.
185      */
186     private static final class Writer implements WritableByteChannel {
187 
188         private ByteBuf writableBuffer;
189         private final BrotliEncoderChannel brotliEncoderChannel;
190         private final ChannelHandlerContext ctx;
191         private boolean isClosed;
192 
193         private Writer(Encoder.Parameters parameters, ChannelHandlerContext ctx) throws IOException {
194             brotliEncoderChannel = new BrotliEncoderChannel(this, parameters);
195             this.ctx = ctx;
196         }
197 
198         private void encode(ByteBuf msg, boolean preferDirect) throws Exception {
199             try {
200                 allocate(preferDirect);
201 
202                 // Compress data and flush it into Buffer.
203                 //
204                 // As soon as we call flush, Encoder will be triggered to write encoded
205                 // data into WritableByteChannel.
206                 //
207                 // A race condition will not arise because one flush call to encoder will result
208                 // in only 1 call at `write(ByteBuffer)`.
209                 ByteBuffer nioBuffer = CompressionUtil.safeReadableNioBuffer(msg);
210                 int position = nioBuffer.position();
211                 brotliEncoderChannel.write(nioBuffer);
212                 msg.skipBytes(nioBuffer.position() - position);
213                 brotliEncoderChannel.flush();
214             } catch (Exception e) {
215                 ReferenceCountUtil.release(msg);
216                 throw e;
217             }
218         }
219 
220         private void allocate(boolean preferDirect) {
221             if (preferDirect) {
222                 writableBuffer = ctx.alloc().ioBuffer();
223             } else {
224                 writableBuffer = ctx.alloc().buffer();
225             }
226         }
227 
228         @Override
229         public int write(ByteBuffer src) throws IOException {
230             if (!isOpen()) {
231                 throw new ClosedChannelException();
232             }
233 
234             return writableBuffer.writeBytes(src).readableBytes();
235         }
236 
237         @Override
238         public boolean isOpen() {
239             return !isClosed;
240         }
241 
242         @Override
243         public void close() {
244             final ChannelPromise promise = ctx.newPromise();
245 
246             ctx.executor().execute(new Runnable() {
247                 @Override
248                 public void run() {
249                     try {
250                         finish(promise);
251                     } catch (IOException ex) {
252                         promise.setFailure(new IllegalStateException("Failed to finish encoding", ex));
253                     }
254                 }
255             });
256         }
257 
258         public void finish(final ChannelPromise promise) throws IOException {
259             if (!isClosed) {
260                 // Allocate a buffer and write last pending data.
261                 allocate(true);
262 
263                 try {
264                     brotliEncoderChannel.close();
265                     isClosed = true;
266                 } catch (Exception ex) {
267                     promise.setFailure(ex);
268 
269                     // Since we have already allocated Buffer for close operation,
270                     // we will release that buffer to prevent memory leak.
271                     ReferenceCountUtil.release(writableBuffer);
272                     return;
273                 }
274 
275                 ctx.writeAndFlush(writableBuffer, promise);
276             }
277         }
278     }
279 }