View Javadoc
1   /*
2    * Copyright 2024 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.codec.compression;
17  
18  import com.github.luben.zstd.ZstdInputStreamNoFinalizer;
19  import io.netty.buffer.ByteBuf;
20  import io.netty.channel.ChannelHandlerContext;
21  import io.netty.handler.codec.ByteToMessageDecoder;
22  
23  import java.io.Closeable;
24  import java.io.IOException;
25  import java.io.InputStream;
26  import java.util.List;
27  
28  /**
29   * Decompresses a compressed block {@link ByteBuf} using the Zstandard algorithm.
30   * See <a href="https://facebook.github.io/zstd">Zstandard</a>.
31   */
32  public final class ZstdDecoder extends ByteToMessageDecoder {
33      // Don't use static here as we want to still allow to load the classes.
34      {
35          try {
36              Zstd.ensureAvailability();
37          } catch (Throwable throwable) {
38              throw new ExceptionInInitializerError(throwable);
39          }
40      }
41  
42      private final MutableByteBufInputStream inputStream = new MutableByteBufInputStream();
43      private ZstdInputStreamNoFinalizer zstdIs;
44  
45      private State currentState = State.DECOMPRESS_DATA;
46  
47      /**
48       * Current state of stream.
49       */
50      private enum State {
51          DECOMPRESS_DATA,
52          CORRUPTED
53      }
54  
55      @Override
56      protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
57          try {
58              if (currentState == State.CORRUPTED) {
59                  in.skipBytes(in.readableBytes());
60                  return;
61              }
62              final int compressedLength = in.readableBytes();
63  
64              inputStream.current = in;
65  
66              ByteBuf outBuffer = null;
67              try {
68                  int w;
69                  do {
70                      if (outBuffer == null) {
71                          // Let's start with the compressedLength * 2 as often we will not have everything
72                          // we need in the in buffer and don't want to reserve too much memory.
73                          outBuffer = ctx.alloc().heapBuffer(compressedLength * 2);
74                      }
75                      do {
76                          w = outBuffer.writeBytes(zstdIs, outBuffer.writableBytes());
77                      } while (w != -1 && outBuffer.isWritable());
78                      if (outBuffer.isReadable()) {
79                          out.add(outBuffer);
80                          outBuffer = null;
81                      }
82                  } while (w != -1);
83              } finally {
84                  if (outBuffer != null) {
85                      outBuffer.release();
86                  }
87              }
88          } catch (Exception e) {
89              currentState = State.CORRUPTED;
90              throw new DecompressionException(e);
91          } finally {
92              inputStream.current = null;
93          }
94      }
95  
96      @Override
97      public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
98          super.handlerAdded(ctx);
99          zstdIs = new ZstdInputStreamNoFinalizer(inputStream);
100         zstdIs.setContinuous(true);
101     }
102 
103     @Override
104     protected void handlerRemoved0(ChannelHandlerContext ctx) throws Exception {
105         try {
106             closeSilently(zstdIs);
107         } finally {
108             super.handlerRemoved0(ctx);
109         }
110     }
111 
112     private static void closeSilently(Closeable closeable) {
113         if (closeable != null) {
114             try {
115                 closeable.close();
116             } catch (IOException ignore) {
117                 // ignore
118             }
119         }
120     }
121 
122     private static final class MutableByteBufInputStream extends InputStream {
123         ByteBuf current;
124 
125         @Override
126         public int read() {
127             if (current == null || !current.isReadable()) {
128                 return -1;
129             }
130             return current.readByte() & 0xff;
131         }
132 
133         @Override
134         public int read(byte[] b, int off, int len) {
135             int available = available();
136             if (available == 0) {
137                 return -1;
138             }
139 
140             len = Math.min(available, len);
141             current.readBytes(b, off, len);
142             return len;
143         }
144 
145         @Override
146         public int available() {
147             return current == null ? 0 : current.readableBytes();
148         }
149     }
150 }