View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.codec.compression;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufUtil;
20  import io.netty.channel.ChannelHandlerContext;
21  import io.netty.handler.codec.MessageToByteEncoder;
22  
23  import static io.netty.handler.codec.compression.Snappy.*;
24  
25  /**
26   * Compresses a {@link ByteBuf} using the Snappy framing format.
27   *
28   * See http://code.google.com/p/snappy/source/browse/trunk/framing_format.txt
29   */
30  public class SnappyFramedEncoder extends MessageToByteEncoder<ByteBuf> {
31      /**
32       * The minimum amount that we'll consider actually attempting to compress.
33       * This value is preamble + the minimum length our Snappy service will
34       * compress (instead of just emitting a literal).
35       */
36      private static final int MIN_COMPRESSIBLE_LENGTH = 18;
37  
38      /**
39       * All streams should start with the "Stream identifier", containing chunk
40       * type 0xff, a length field of 0x6, and 'sNaPpY' in ASCII.
41       */
42      private static final byte[] STREAM_START = {
43          (byte) 0xff, 0x06, 0x00, 0x00, 0x73, 0x4e, 0x61, 0x50, 0x70, 0x59
44      };
45  
46      private final Snappy snappy = new Snappy();
47      private boolean started;
48  
49      @Override
50      protected void encode(ChannelHandlerContext ctx, ByteBuf in, ByteBuf out) throws Exception {
51          if (!in.isReadable()) {
52              return;
53          }
54  
55          if (!started) {
56              started = true;
57              out.writeBytes(STREAM_START);
58          }
59  
60          int dataLength = in.readableBytes();
61          if (dataLength > MIN_COMPRESSIBLE_LENGTH) {
62              for (;;) {
63                  final int lengthIdx = out.writerIndex() + 1;
64                  if (dataLength < MIN_COMPRESSIBLE_LENGTH) {
65                      ByteBuf slice = in.readSlice(dataLength);
66                      writeUnencodedChunk(slice, out, dataLength);
67                      break;
68                  }
69  
70                  out.writeInt(0);
71                  if (dataLength > Short.MAX_VALUE) {
72                      ByteBuf slice = in.readSlice(Short.MAX_VALUE);
73                      calculateAndWriteChecksum(slice, out);
74                      snappy.encode(slice, out, Short.MAX_VALUE);
75                      setChunkLength(out, lengthIdx);
76                      dataLength -= Short.MAX_VALUE;
77                  } else {
78                      ByteBuf slice = in.readSlice(dataLength);
79                      calculateAndWriteChecksum(slice, out);
80                      snappy.encode(slice, out, dataLength);
81                      setChunkLength(out, lengthIdx);
82                      break;
83                  }
84              }
85          } else {
86              writeUnencodedChunk(in, out, dataLength);
87          }
88      }
89  
90      private static void writeUnencodedChunk(ByteBuf in, ByteBuf out, int dataLength) {
91          out.writeByte(1);
92          writeChunkLength(out, dataLength + 4);
93          calculateAndWriteChecksum(in, out);
94          out.writeBytes(in, dataLength);
95      }
96  
97      private static void setChunkLength(ByteBuf out, int lengthIdx) {
98          int chunkLength = out.writerIndex() - lengthIdx - 3;
99          if (chunkLength >>> 24 != 0) {
100             throw new CompressionException("compressed data too large: " + chunkLength);
101         }
102         out.setMedium(lengthIdx, ByteBufUtil.swapMedium(chunkLength));
103     }
104 
105     /**
106      * Writes the 2-byte chunk length to the output buffer.
107      *
108      * @param out The buffer to write to
109      * @param chunkLength The length to write
110      */
111     private static void writeChunkLength(ByteBuf out, int chunkLength) {
112         out.writeMedium(ByteBufUtil.swapMedium(chunkLength));
113     }
114 
115     /**
116      * Calculates and writes the 4-byte checksum to the output buffer
117      *
118      * @param slice The data to calculate the checksum for
119      * @param out The output buffer to write the checksum to
120      */
121     private static void calculateAndWriteChecksum(ByteBuf slice, ByteBuf out) {
122         out.writeInt(ByteBufUtil.swapInt(calculateChecksum(slice)));
123     }
124 }