View Javadoc
1   /*
2    * Copyright 2021 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  
17  package io.netty.handler.codec.compression;
18  
19  import com.aayushatharva.brotli4j.decoder.DecoderJNI;
20  import io.netty.buffer.ByteBuf;
21  import io.netty.channel.ChannelHandlerContext;
22  import io.netty.handler.codec.ByteToMessageDecoder;
23  import io.netty.util.internal.ObjectUtil;
24  
25  import java.nio.ByteBuffer;
26  import java.util.List;
27  
28  /**
29   * Decompresses a {@link ByteBuf} encoded with the brotli format.
30   *
31   * See <a href="https://github.com/google/brotli">brotli</a>.
32   */
33  public final class BrotliDecoder extends ByteToMessageDecoder {
34  
35      private static final int DEFAULT_MAX_FORWARD_BYTES = CompressionUtil.DEFAULT_MAX_FORWARD_BYTES;
36  
37      private enum State {
38          DONE, NEEDS_MORE_INPUT, ERROR
39      }
40  
41      static {
42          try {
43              Brotli.ensureAvailability();
44          } catch (Throwable throwable) {
45              throw new ExceptionInInitializerError(throwable);
46          }
47      }
48  
49      private final int inputBufferSize;
50      private DecoderJNI.Wrapper decoder;
51      private boolean destroyed;
52      private boolean needsRead;
53      private ByteBuf accumBuffer;
54  
55      /**
56       * Creates a new BrotliDecoder with a default 8kB input buffer
57       */
58      public BrotliDecoder() {
59          this(8 * 1024);
60      }
61  
62      /**
63       * Creates a new BrotliDecoder
64       * @param inputBufferSize desired size of the input buffer in bytes
65       */
66      public BrotliDecoder(int inputBufferSize) {
67          this.inputBufferSize = ObjectUtil.checkPositive(inputBufferSize, "inputBufferSize");
68      }
69  
70      private void forwardOutput(ChannelHandlerContext ctx) {
71          ByteBuffer nativeBuffer = decoder.pull();
72          // nativeBuffer actually wraps brotli's internal buffer so we need to copy its content
73          int remaining = nativeBuffer.remaining();
74          if (accumBuffer == null) {
75              accumBuffer = ctx.alloc().buffer(remaining);
76          }
77          accumBuffer.writeBytes(nativeBuffer);
78          needsRead = false;
79          if (accumBuffer.readableBytes() >= DEFAULT_MAX_FORWARD_BYTES) {
80              ctx.fireChannelRead(accumBuffer);
81              accumBuffer = null;
82          }
83      }
84  
85      private void flushAccumBuffer(ChannelHandlerContext ctx) {
86          if (accumBuffer != null && accumBuffer.isReadable()) {
87              ctx.fireChannelRead(accumBuffer);
88          } else if (accumBuffer != null) {
89              accumBuffer.release();
90          }
91          accumBuffer = null;
92      }
93  
94      private State decompress(ChannelHandlerContext ctx, ByteBuf input) {
95          for (;;) {
96              switch (decoder.getStatus()) {
97                  case DONE:
98                      return State.DONE;
99  
100                 case OK:
101                     decoder.push(0);
102                     break;
103 
104                 case NEEDS_MORE_INPUT:
105                     if (decoder.hasOutput()) {
106                         forwardOutput(ctx);
107                     }
108 
109                     if (!input.isReadable()) {
110                         return State.NEEDS_MORE_INPUT;
111                     }
112 
113                     ByteBuffer decoderInputBuffer = decoder.getInputBuffer();
114                     decoderInputBuffer.clear();
115                     int readBytes = readBytes(input, decoderInputBuffer);
116                     decoder.push(readBytes);
117                     break;
118 
119                 case NEEDS_MORE_OUTPUT:
120                     forwardOutput(ctx);
121                     break;
122 
123                 default:
124                     return State.ERROR;
125             }
126         }
127     }
128 
129     private static int readBytes(ByteBuf in, ByteBuffer dest) {
130         int limit = Math.min(in.readableBytes(), dest.remaining());
131         ByteBuffer slice = dest.slice();
132         slice.limit(limit);
133         in.readBytes(slice);
134         dest.position(dest.position() + limit);
135         return limit;
136     }
137 
138     @Override
139     public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
140         decoder = new DecoderJNI.Wrapper(inputBufferSize);
141     }
142 
143     @Override
144     protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
145         needsRead = true;
146         if (destroyed) {
147             // Skip data received after finished.
148             in.skipBytes(in.readableBytes());
149             return;
150         }
151 
152         if (!in.isReadable()) {
153             return;
154         }
155 
156         try {
157             State state = decompress(ctx, in);
158             if (state == State.DONE) {
159                 destroy();
160             } else if (state == State.ERROR) {
161                 throw new DecompressionException("Brotli stream corrupted");
162             }
163         } catch (Exception e) {
164             destroy();
165             throw e;
166         } finally {
167             flushAccumBuffer(ctx);
168         }
169     }
170 
171     private void destroy() {
172         if (!destroyed) {
173             destroyed = true;
174             decoder.destroy();
175         }
176     }
177 
178     @Override
179     protected void handlerRemoved0(ChannelHandlerContext ctx) throws Exception {
180         try {
181             destroy();
182         } finally {
183             super.handlerRemoved0(ctx);
184         }
185     }
186 
187     @Override
188     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
189         try {
190             destroy();
191         } finally {
192             super.channelInactive(ctx);
193         }
194     }
195 
196     @Override
197     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
198         // Discard bytes of the cumulation buffer if needed.
199         discardSomeReadBytes();
200 
201         if (needsRead && !ctx.channel().config().isAutoRead()) {
202             ctx.read();
203         }
204         ctx.fireChannelReadComplete();
205     }
206 }