View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.codec.compression;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.channel.ChannelHandlerContext;
20  import io.netty.handler.codec.MessageToByteEncoder;
21  
22  import static io.netty.handler.codec.compression.Snappy.calculateChecksum;
23  
24  /**
25   * Compresses a {@link ByteBuf} using the Snappy framing format.
26   *
27   * See <a href="https://github.com/google/snappy/blob/master/framing_format.txt">Snappy framing format</a>.
28   */
29  public class SnappyFrameEncoder extends MessageToByteEncoder<ByteBuf> {
30  
31      private static final short SNAPPY_SLICE_SIZE = Short.MAX_VALUE;
32  
33      /**
34       * Both
35       * {@value io.netty.handler.codec.compression.SnappyFrameEncoder#SNAPPY_SLICE_SIZE}
36       * and {@value io.netty.handler.codec.compression.SnappyFrameEncoder#SNAPPY_SLICE_JUMBO_SIZE}
37       * are valid lengths for the Snappy framing format
38       */
39      private static final int SNAPPY_SLICE_JUMBO_SIZE = 65535;
40  
41      /**
42       * The minimum amount that we'll consider actually attempting to compress.
43       * This value is preamble + the minimum length our Snappy service will
44       * compress (instead of just emitting a literal).
45       */
46      private static final int MIN_COMPRESSIBLE_LENGTH = 18;
47  
48      /**
49       * All streams should start with the "Stream identifier", containing chunk
50       * type 0xff, a length field of 0x6, and 'sNaPpY' in ASCII.
51       */
52      private static final byte[] STREAM_START = {
53          (byte) 0xff, 0x06, 0x00, 0x00, 0x73, 0x4e, 0x61, 0x50, 0x70, 0x59
54      };
55  
56      public SnappyFrameEncoder() {
57          this(SNAPPY_SLICE_SIZE);
58      }
59  
60      /**
61       * Create a new instance with a
62       * {@value io.netty.handler.codec.compression.SnappyFrameEncoder#SNAPPY_SLICE_JUMBO_SIZE}
63       * chunk size.
64       */
65      public static SnappyFrameEncoder snappyEncoderWithJumboFrames() {
66          return new SnappyFrameEncoder(SNAPPY_SLICE_JUMBO_SIZE);
67      }
68  
69      private SnappyFrameEncoder(int sliceSize) {
70          this.sliceSize = sliceSize;
71      }
72  
73      private final Snappy snappy = new Snappy();
74      private boolean started;
75      private final int sliceSize;
76  
77      @Override
78      protected void encode(ChannelHandlerContext ctx, ByteBuf in, ByteBuf out) throws Exception {
79          if (!in.isReadable()) {
80              return;
81          }
82  
83          if (!started) {
84              started = true;
85              out.writeBytes(STREAM_START);
86          }
87  
88          int dataLength = in.readableBytes();
89          if (dataLength > MIN_COMPRESSIBLE_LENGTH) {
90              for (;;) {
91                  final int lengthIdx = out.writerIndex() + 1;
92                  if (dataLength < MIN_COMPRESSIBLE_LENGTH) {
93                      ByteBuf slice = in.readSlice(dataLength);
94                      writeUnencodedChunk(slice, out, dataLength);
95                      break;
96                  }
97  
98                  out.writeInt(0);
99                  if (dataLength > sliceSize) {
100                     ByteBuf slice = in.readSlice(sliceSize);
101                     calculateAndWriteChecksum(slice, out);
102                     snappy.encode(slice, out, sliceSize);
103                     setChunkLength(out, lengthIdx);
104                     dataLength -= sliceSize;
105                 } else {
106                     ByteBuf slice = in.readSlice(dataLength);
107                     calculateAndWriteChecksum(slice, out);
108                     snappy.encode(slice, out, dataLength);
109                     setChunkLength(out, lengthIdx);
110                     break;
111                 }
112             }
113         } else {
114             writeUnencodedChunk(in, out, dataLength);
115         }
116     }
117 
118     private static void writeUnencodedChunk(ByteBuf in, ByteBuf out, int dataLength) {
119         out.writeByte(1);
120         writeChunkLength(out, dataLength + 4);
121         calculateAndWriteChecksum(in, out);
122         out.writeBytes(in, dataLength);
123     }
124 
125     private static void setChunkLength(ByteBuf out, int lengthIdx) {
126         int chunkLength = out.writerIndex() - lengthIdx - 3;
127         if (chunkLength >>> 24 != 0) {
128             throw new CompressionException("compressed data too large: " + chunkLength);
129         }
130         out.setMediumLE(lengthIdx, chunkLength);
131     }
132 
133     /**
134      * Writes the 2-byte chunk length to the output buffer.
135      *
136      * @param out The buffer to write to
137      * @param chunkLength The length to write
138      */
139     private static void writeChunkLength(ByteBuf out, int chunkLength) {
140         out.writeMediumLE(chunkLength);
141     }
142 
143     /**
144      * Calculates and writes the 4-byte checksum to the output buffer
145      *
146      * @param slice The data to calculate the checksum for
147      * @param out The output buffer to write the checksum to
148      */
149     private static void calculateAndWriteChecksum(ByteBuf slice, ByteBuf out) {
150         out.writeIntLE(calculateChecksum(slice));
151     }
152 }