View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.codec.compression;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.channel.ChannelHandlerContext;
20  import io.netty.handler.codec.MessageToByteEncoder;
21  
22  import static io.netty.handler.codec.compression.Snappy.calculateChecksum;
23  
24  /**
25   * Compresses a {@link ByteBuf} using the Snappy framing format.
26   *
27   * See <a href="https://github.com/google/snappy/blob/master/framing_format.txt">Snappy framing format</a>.
28   */
29  public class SnappyFrameEncoder extends MessageToByteEncoder<ByteBuf> {
30  
31      private static final short SNAPPY_SLICE_SIZE = Short.MAX_VALUE;
32  
33      /**
34       * Both
35       * {@value io.netty.handler.codec.compression.SnappyFrameEncoder#SNAPPY_SLICE_SIZE}
36       * and {@value io.netty.handler.codec.compression.SnappyFrameEncoder#SNAPPY_SLICE_JUMBO_SIZE}
37       * are valid lengths for the Snappy framing format
38       */
39      private static final int SNAPPY_SLICE_JUMBO_SIZE = 65535;
40  
41      /**
42       * The minimum amount that we'll consider actually attempting to compress.
43       * This value is preamble + the minimum length our Snappy service will
44       * compress (instead of just emitting a literal).
45       */
46      private static final int MIN_COMPRESSIBLE_LENGTH = 18;
47  
48      /**
49       * All streams should start with the "Stream identifier", containing chunk
50       * type 0xff, a length field of 0x6, and 'sNaPpY' in ASCII.
51       */
52      private static final byte[] STREAM_START = {
53          (byte) 0xff, 0x06, 0x00, 0x00, 0x73, 0x4e, 0x61, 0x50, 0x70, 0x59
54      };
55  
56      public SnappyFrameEncoder() {
57          this(SNAPPY_SLICE_SIZE);
58      }
59  
60      /**
61       * Create a new instance with a
62       * {@value io.netty.handler.codec.compression.SnappyFrameEncoder#SNAPPY_SLICE_JUMBO_SIZE}
63       * chunk size.
64       */
65      public static SnappyFrameEncoder snappyEncoderWithJumboFrames() {
66          return new SnappyFrameEncoder(SNAPPY_SLICE_JUMBO_SIZE);
67      }
68  
69      private SnappyFrameEncoder(int sliceSize) {
70          super(ByteBuf.class);
71          this.sliceSize = sliceSize;
72      }
73  
74      private final Snappy snappy = new Snappy();
75      private boolean started;
76      private final int sliceSize;
77  
78      @Override
79      protected void encode(ChannelHandlerContext ctx, ByteBuf in, ByteBuf out) throws Exception {
80          if (!in.isReadable()) {
81              return;
82          }
83  
84          if (!started) {
85              started = true;
86              out.writeBytes(STREAM_START);
87          }
88  
89          int dataLength = in.readableBytes();
90          if (dataLength > MIN_COMPRESSIBLE_LENGTH) {
91              for (;;) {
92                  final int lengthIdx = out.writerIndex() + 1;
93                  if (dataLength < MIN_COMPRESSIBLE_LENGTH) {
94                      ByteBuf slice = in.readSlice(dataLength);
95                      writeUnencodedChunk(slice, out, dataLength);
96                      break;
97                  }
98  
99                  out.writeInt(0);
100                 if (dataLength > sliceSize) {
101                     ByteBuf slice = in.readSlice(sliceSize);
102                     calculateAndWriteChecksum(slice, out);
103                     snappy.encode(slice, out, sliceSize);
104                     setChunkLength(out, lengthIdx);
105                     dataLength -= sliceSize;
106                 } else {
107                     ByteBuf slice = in.readSlice(dataLength);
108                     calculateAndWriteChecksum(slice, out);
109                     snappy.encode(slice, out, dataLength);
110                     setChunkLength(out, lengthIdx);
111                     break;
112                 }
113             }
114         } else {
115             writeUnencodedChunk(in, out, dataLength);
116         }
117     }
118 
119     private static void writeUnencodedChunk(ByteBuf in, ByteBuf out, int dataLength) {
120         out.writeByte(1);
121         writeChunkLength(out, dataLength + 4);
122         calculateAndWriteChecksum(in, out);
123         out.writeBytes(in, dataLength);
124     }
125 
126     private static void setChunkLength(ByteBuf out, int lengthIdx) {
127         int chunkLength = out.writerIndex() - lengthIdx - 3;
128         if (chunkLength >>> 24 != 0) {
129             throw new CompressionException("compressed data too large: " + chunkLength);
130         }
131         out.setMediumLE(lengthIdx, chunkLength);
132     }
133 
134     /**
135      * Writes the 2-byte chunk length to the output buffer.
136      *
137      * @param out The buffer to write to
138      * @param chunkLength The length to write
139      */
140     private static void writeChunkLength(ByteBuf out, int chunkLength) {
141         out.writeMediumLE(chunkLength);
142     }
143 
144     /**
145      * Calculates and writes the 4-byte checksum to the output buffer
146      *
147      * @param slice The data to calculate the checksum for
148      * @param out The output buffer to write the checksum to
149      */
150     private static void calculateAndWriteChecksum(ByteBuf slice, ByteBuf out) {
151         out.writeIntLE(calculateChecksum(slice));
152     }
153 }