View Javadoc
1   /*
2    * Copyright 2021 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  
17  package io.netty.handler.codec.compression;
18  
19  import com.aayushatharva.brotli4j.decoder.DecoderJNI;
20  import io.netty.buffer.ByteBuf;
21  import io.netty.channel.ChannelHandlerContext;
22  import io.netty.handler.codec.ByteToMessageDecoder;
23  import io.netty.util.internal.ObjectUtil;
24  
25  import java.nio.ByteBuffer;
26  import java.util.List;
27  
28  /**
29   * Decompresses a {@link ByteBuf} encoded with the brotli format.
30   * <p>
31   * See <a href="https://github.com/google/brotli">brotli</a>.
32   */
33  public final class BrotliDecoder extends ByteToMessageDecoder {
34  
35      private static final int DEFAULT_MAX_FORWARD_BYTES = CompressionUtil.DEFAULT_MAX_FORWARD_BYTES;
36      private static final int DEFAULT_INPUT_BUFFER_SIZE = 8 * 1024;
37  
38      private enum State {
39          DONE, NEEDS_MORE_INPUT, ERROR
40      }
41  
42      static {
43          try {
44              Brotli.ensureAvailability();
45          } catch (Throwable throwable) {
46              throw new ExceptionInInitializerError(throwable);
47          }
48      }
49  
50      private final int inputBufferSize;
51      private final int outputBufferSize;
52      private DecoderJNI.Wrapper decoder;
53      private boolean destroyed;
54      private boolean needsRead;
55      private ByteBuf accumBuffer;
56  
57      /**
58       * Creates a new BrotliDecoder with a default 8kB input buffer
59       */
60      public BrotliDecoder() {
61          this(DEFAULT_INPUT_BUFFER_SIZE);
62      }
63  
64      /**
65       * Creates a new BrotliDecoder
66       * @param inputBufferSize desired size of the input buffer in bytes
67       */
68      public BrotliDecoder(int inputBufferSize) {
69          this(inputBufferSize == 0 ? DEFAULT_INPUT_BUFFER_SIZE : inputBufferSize, DEFAULT_MAX_FORWARD_BYTES);
70      }
71  
72      /**
73       * Creates a new BrotliDecoder
74       * @param inputBufferSize desired size of the input buffer in bytes
75       * @param outputBufferSize desired max size of the output buffer in bytes
76       *                         (produce multiple output buffers if exceeded)
77       */
78      public BrotliDecoder(int inputBufferSize, int outputBufferSize) {
79          this.inputBufferSize = ObjectUtil.checkPositive(inputBufferSize, "inputBufferSize");
80          this.outputBufferSize = ObjectUtil.checkPositive(outputBufferSize, "outputBufferSize");
81      }
82  
83      private void forwardOutput(ChannelHandlerContext ctx) {
84          ByteBuffer nativeBuffer = decoder.pull(outputBufferSize);
85          // nativeBuffer actually wraps brotli's internal buffer so we need to copy its content
86          int remaining = nativeBuffer.remaining();
87          if (accumBuffer == null) {
88              accumBuffer = ctx.alloc().buffer(remaining);
89          }
90          accumBuffer.writeBytes(nativeBuffer);
91          needsRead = false;
92          if (accumBuffer.readableBytes() >= outputBufferSize) {
93              ctx.fireChannelRead(accumBuffer);
94              accumBuffer = null;
95          }
96      }
97  
98      private void flushAccumBuffer(ChannelHandlerContext ctx) {
99          if (accumBuffer != null && accumBuffer.isReadable()) {
100             ctx.fireChannelRead(accumBuffer);
101         } else if (accumBuffer != null) {
102             accumBuffer.release();
103         }
104         accumBuffer = null;
105     }
106 
107     private State decompress(ChannelHandlerContext ctx, ByteBuf input) {
108         for (;;) {
109             switch (decoder.getStatus()) {
110                 case DONE:
111                     return State.DONE;
112 
113                 case OK:
114                     decoder.push(0);
115                     break;
116 
117                 case NEEDS_MORE_INPUT:
118                     while (decoder.hasOutput()) {
119                         forwardOutput(ctx);
120                     }
121 
122                     if (!input.isReadable()) {
123                         return State.NEEDS_MORE_INPUT;
124                     }
125 
126                     ByteBuffer decoderInputBuffer = decoder.getInputBuffer();
127                     decoderInputBuffer.clear();
128                     int readBytes = readBytes(input, decoderInputBuffer);
129                     decoder.push(readBytes);
130                     break;
131 
132                 case NEEDS_MORE_OUTPUT:
133                     forwardOutput(ctx);
134                     break;
135 
136                 default:
137                     return State.ERROR;
138             }
139         }
140     }
141 
142     private static int readBytes(ByteBuf in, ByteBuffer dest) {
143         int limit = Math.min(in.readableBytes(), dest.remaining());
144         ByteBuffer slice = dest.slice();
145         slice.limit(limit);
146         in.readBytes(slice);
147         dest.position(dest.position() + limit);
148         return limit;
149     }
150 
151     @Override
152     public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
153         decoder = new DecoderJNI.Wrapper(inputBufferSize);
154     }
155 
156     @Override
157     protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
158         needsRead = true;
159         if (destroyed) {
160             // Skip data received after finished.
161             in.skipBytes(in.readableBytes());
162             return;
163         }
164 
165         if (!in.isReadable()) {
166             return;
167         }
168 
169         try {
170             State state = decompress(ctx, in);
171             if (state == State.DONE) {
172                 destroy();
173             } else if (state == State.ERROR) {
174                 throw new DecompressionException("Brotli stream corrupted");
175             }
176         } catch (Exception e) {
177             destroy();
178             throw e;
179         } finally {
180             flushAccumBuffer(ctx);
181         }
182     }
183 
184     private void destroy() {
185         if (!destroyed) {
186             destroyed = true;
187             decoder.destroy();
188         }
189     }
190 
191     @Override
192     protected void handlerRemoved0(ChannelHandlerContext ctx) throws Exception {
193         try {
194             destroy();
195         } finally {
196             super.handlerRemoved0(ctx);
197         }
198     }
199 
200     @Override
201     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
202         try {
203             destroy();
204         } finally {
205             super.channelInactive(ctx);
206         }
207     }
208 
209     @Override
210     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
211         // Discard bytes of the cumulation buffer if needed.
212         discardSomeReadBytes();
213 
214         if (needsRead && !ctx.channel().config().isAutoRead()) {
215             ctx.read();
216         }
217         ctx.fireChannelReadComplete();
218     }
219 }