1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  package io.netty.handler.codec.compression;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.channel.ChannelHandlerContext;
20  import io.netty.handler.codec.MessageToByteEncoder;
21  
22  import static io.netty.handler.codec.compression.Snappy.calculateChecksum;
23  
24  
25  
26  
27  
28  
29  public class SnappyFrameEncoder extends MessageToByteEncoder<ByteBuf> {
30  
31      private static final short SNAPPY_SLICE_SIZE = Short.MAX_VALUE;
32  
33      
34  
35  
36  
37  
38  
39      private static final int SNAPPY_SLICE_JUMBO_SIZE = 65535;
40  
41      
42  
43  
44  
45  
46      private static final int MIN_COMPRESSIBLE_LENGTH = 18;
47  
48      
49  
50  
51  
52      private static final byte[] STREAM_START = {
53          (byte) 0xff, 0x06, 0x00, 0x00, 0x73, 0x4e, 0x61, 0x50, 0x70, 0x59
54      };
55  
56      public SnappyFrameEncoder() {
57          this(SNAPPY_SLICE_SIZE);
58      }
59  
60      
61  
62  
63  
64  
65      public static SnappyFrameEncoder snappyEncoderWithJumboFrames() {
66          return new SnappyFrameEncoder(SNAPPY_SLICE_JUMBO_SIZE);
67      }
68  
69      private SnappyFrameEncoder(int sliceSize) {
70          this.sliceSize = sliceSize;
71      }
72  
73      private final Snappy snappy = new Snappy();
74      private boolean started;
75      private final int sliceSize;
76  
77      @Override
78      protected void encode(ChannelHandlerContext ctx, ByteBuf in, ByteBuf out) throws Exception {
79          if (!in.isReadable()) {
80              return;
81          }
82  
83          if (!started) {
84              started = true;
85              out.writeBytes(STREAM_START);
86          }
87  
88          int dataLength = in.readableBytes();
89          if (dataLength > MIN_COMPRESSIBLE_LENGTH) {
90              for (;;) {
91                  final int lengthIdx = out.writerIndex() + 1;
92                  if (dataLength < MIN_COMPRESSIBLE_LENGTH) {
93                      ByteBuf slice = in.readSlice(dataLength);
94                      writeUnencodedChunk(slice, out, dataLength);
95                      break;
96                  }
97  
98                  out.writeInt(0);
99                  if (dataLength > sliceSize) {
100                     ByteBuf slice = in.readSlice(sliceSize);
101                     calculateAndWriteChecksum(slice, out);
102                     snappy.encode(slice, out, sliceSize);
103                     setChunkLength(out, lengthIdx);
104                     dataLength -= sliceSize;
105                 } else {
106                     ByteBuf slice = in.readSlice(dataLength);
107                     calculateAndWriteChecksum(slice, out);
108                     snappy.encode(slice, out, dataLength);
109                     setChunkLength(out, lengthIdx);
110                     break;
111                 }
112             }
113         } else {
114             writeUnencodedChunk(in, out, dataLength);
115         }
116     }
117 
118     private static void writeUnencodedChunk(ByteBuf in, ByteBuf out, int dataLength) {
119         out.writeByte(1);
120         writeChunkLength(out, dataLength + 4);
121         calculateAndWriteChecksum(in, out);
122         out.writeBytes(in, dataLength);
123     }
124 
125     private static void setChunkLength(ByteBuf out, int lengthIdx) {
126         int chunkLength = out.writerIndex() - lengthIdx - 3;
127         if (chunkLength >>> 24 != 0) {
128             throw new CompressionException("compressed data too large: " + chunkLength);
129         }
130         out.setMediumLE(lengthIdx, chunkLength);
131     }
132 
133     
134 
135 
136 
137 
138 
139     private static void writeChunkLength(ByteBuf out, int chunkLength) {
140         out.writeMediumLE(chunkLength);
141     }
142 
143     
144 
145 
146 
147 
148 
149     private static void calculateAndWriteChecksum(ByteBuf slice, ByteBuf out) {
150         out.writeIntLE(calculateChecksum(slice));
151     }
152 }