View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty5.handler.codec.compression;
17  
18  import io.netty5.buffer.BufferUtil;
19  import io.netty5.buffer.api.Buffer;
20  import io.netty5.buffer.api.BufferAllocator;
21  
22  import java.util.function.Supplier;
23  
24  import static io.netty5.handler.codec.compression.Snappy.calculateChecksum;
25  
26  /**
27   * Compresses a {@link Buffer} using the Snappy framing format.
28   *
29   * See <a href="https://github.com/google/snappy/blob/master/framing_format.txt">Snappy framing format</a>.
30   */
31  public final class SnappyCompressor implements Compressor {
32      private enum State {
33          Init,
34          Started,
35          Finished,
36          Closed
37      }
38  
39      /**
40       * The minimum amount that we'll consider actually attempting to compress.
41       * This value is preamble + the minimum length our Snappy service will
42       * compress (instead of just emitting a literal).
43       */
44      private static final int MIN_COMPRESSIBLE_LENGTH = 18;
45  
46      /**
47       * All streams should start with the "Stream identifier", containing chunk
48       * type 0xff, a length field of 0x6, and 'sNaPpY' in ASCII.
49       */
50      private static final byte[] STREAM_START = {
51          (byte) 0xff, 0x06, 0x00, 0x00, 0x73, 0x4e, 0x61, 0x50, 0x70, 0x59
52      };
53  
54      private final Snappy snappy = new Snappy();
55      private State state = State.Init;
56  
57      private SnappyCompressor() { }
58  
59      /**
60       * Creates a new snappy compressor factory.
61       *
62       * @return  the new instance.
63       */
64      public static Supplier<SnappyCompressor> newFactory() {
65          return SnappyCompressor::new;
66      }
67  
68      @Override
69      public Buffer compress(Buffer in, BufferAllocator allocator) throws CompressionException {
70          switch (state) {
71              case Finished:
72                  return allocator.allocate(0);
73              case Closed:
74                  throw new CompressionException("Compressor closed");
75              default:
76                  // TODO: Make some smart decision about the initial capacity.
77                  Buffer out = allocator.allocate(256);
78                  try {
79                      if (state == State.Init) {
80                          state = State.Started;
81                          out.writeBytes(STREAM_START);
82                      } else if (state != State.Started) {
83                          throw new IllegalStateException();
84                      }
85  
86                      int dataLength = in.readableBytes();
87                      if (dataLength > MIN_COMPRESSIBLE_LENGTH) {
88                          for (;;) {
89                              final int lengthIdx = out.writerOffset() + 1;
90                              if (dataLength < MIN_COMPRESSIBLE_LENGTH) {
91                                  try (Buffer slice = in.readSplit(dataLength)) {
92                                      writeUnencodedChunk(slice, out, dataLength);
93                                      break;
94                                  }
95                              }
96  
97                              out.writeInt(0);
98                              if (dataLength > Short.MAX_VALUE) {
99                                  try (Buffer slice = in.readSplit(Short.MAX_VALUE)) {
100                                     calculateAndWriteChecksum(slice, out);
101                                     snappy.encode(slice, out, Short.MAX_VALUE);
102                                     setChunkLength(out, lengthIdx);
103                                     dataLength -= Short.MAX_VALUE;
104                                 }
105                             } else {
106                                 try (Buffer slice = in.readSplit(dataLength)) {
107                                     calculateAndWriteChecksum(slice, out);
108                                     snappy.encode(slice, out, dataLength);
109                                     setChunkLength(out, lengthIdx);
110                                     break;
111                                 }
112                             }
113                         }
114                     } else {
115                         writeUnencodedChunk(in, out, dataLength);
116                     }
117                     return out;
118                 } catch (Throwable cause) {
119                     out.close();
120                     throw cause;
121                 }
122         }
123     }
124 
125     @Override
126     public Buffer finish(BufferAllocator allocator) {
127         switch (state) {
128             case Closed:
129                 throw new CompressionException("Compressor closed");
130             case Finished:
131             case Init:
132             case Started:
133                 state = State.Finished;
134                 return allocator.allocate(0);
135             default:
136                 throw new IllegalStateException();
137         }
138     }
139 
140     @Override
141     public boolean isFinished() {
142         switch (state) {
143             case Finished:
144             case Closed:
145                 return true;
146             default:
147                 return false;
148         }
149     }
150 
151     @Override
152     public boolean isClosed() {
153         return state == State.Closed;
154     }
155 
156     @Override
157     public void close() {
158         state = State.Closed;
159     }
160 
161     private static void writeUnencodedChunk(Buffer in, Buffer out, int dataLength) {
162         out.writeByte((byte) 1);
163         writeChunkLength(out, dataLength + 4);
164         calculateAndWriteChecksum(in, out);
165         in.copyInto(in.readerOffset(), out, out.writerOffset(), dataLength);
166         in.skipReadableBytes(dataLength);
167         out.skipWritableBytes(dataLength);
168     }
169 
170     private static void setChunkLength(Buffer out, int lengthIdx) {
171         int chunkLength = out.writerOffset() - lengthIdx - 3;
172         if (chunkLength >>> 24 != 0) {
173             throw new CompressionException("compressed data too large: " + chunkLength);
174         }
175         out.setMedium(lengthIdx, BufferUtil.reverseMedium(chunkLength));
176     }
177 
178     /**
179      * Writes the 2-byte chunk length to the output buffer.
180      *
181      * @param out The buffer to write to
182      * @param chunkLength The length to write
183      */
184     private static void writeChunkLength(Buffer out, int chunkLength) {
185         out.writeMedium(BufferUtil.reverseMedium(chunkLength));
186     }
187 
188     /**
189      * Calculates and writes the 4-byte checksum to the output buffer
190      *
191      * @param slice The data to calculate the checksum for
192      * @param out The output buffer to write the checksum to
193      */
194     private static void calculateAndWriteChecksum(Buffer slice, Buffer out) {
195         out.writeInt(Integer.reverseBytes(calculateChecksum(slice)));
196     }
197 }