1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.codec.compression;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.channel.ChannelHandlerContext;
20 import io.netty.handler.codec.MessageToByteEncoder;
21
22 import static io.netty.handler.codec.compression.Snappy.calculateChecksum;
23
24
25
26
27
28
29 public class SnappyFrameEncoder extends MessageToByteEncoder<ByteBuf> {
30
31 private static final short SNAPPY_SLICE_SIZE = Short.MAX_VALUE;
32
33
34
35
36
37
38
39 private static final int SNAPPY_SLICE_JUMBO_SIZE = 65535;
40
41
42
43
44
45
46 private static final int MIN_COMPRESSIBLE_LENGTH = 18;
47
48
49
50
51
52 private static final byte[] STREAM_START = {
53 (byte) 0xff, 0x06, 0x00, 0x00, 0x73, 0x4e, 0x61, 0x50, 0x70, 0x59
54 };
55
56 public SnappyFrameEncoder() {
57 this(SNAPPY_SLICE_SIZE);
58 }
59
60
61
62
63
64
65 public static SnappyFrameEncoder snappyEncoderWithJumboFrames() {
66 return new SnappyFrameEncoder(SNAPPY_SLICE_JUMBO_SIZE);
67 }
68
69 private SnappyFrameEncoder(int sliceSize) {
70 super(ByteBuf.class);
71 this.sliceSize = sliceSize;
72 }
73
74 private final Snappy snappy = new Snappy();
75 private boolean started;
76 private final int sliceSize;
77
78 @Override
79 protected void encode(ChannelHandlerContext ctx, ByteBuf in, ByteBuf out) throws Exception {
80 if (!in.isReadable()) {
81 return;
82 }
83
84 if (!started) {
85 started = true;
86 out.writeBytes(STREAM_START);
87 }
88
89 int dataLength = in.readableBytes();
90 if (dataLength > MIN_COMPRESSIBLE_LENGTH) {
91 for (;;) {
92 final int lengthIdx = out.writerIndex() + 1;
93 if (dataLength < MIN_COMPRESSIBLE_LENGTH) {
94 ByteBuf slice = in.readSlice(dataLength);
95 writeUnencodedChunk(slice, out, dataLength);
96 break;
97 }
98
99 out.writeInt(0);
100 if (dataLength > sliceSize) {
101 ByteBuf slice = in.readSlice(sliceSize);
102 calculateAndWriteChecksum(slice, out);
103 snappy.encode(slice, out, sliceSize);
104 setChunkLength(out, lengthIdx);
105 dataLength -= sliceSize;
106 } else {
107 ByteBuf slice = in.readSlice(dataLength);
108 calculateAndWriteChecksum(slice, out);
109 snappy.encode(slice, out, dataLength);
110 setChunkLength(out, lengthIdx);
111 break;
112 }
113 }
114 } else {
115 writeUnencodedChunk(in, out, dataLength);
116 }
117 }
118
119 private static void writeUnencodedChunk(ByteBuf in, ByteBuf out, int dataLength) {
120 out.writeByte(1);
121 writeChunkLength(out, dataLength + 4);
122 calculateAndWriteChecksum(in, out);
123 out.writeBytes(in, dataLength);
124 }
125
126 private static void setChunkLength(ByteBuf out, int lengthIdx) {
127 int chunkLength = out.writerIndex() - lengthIdx - 3;
128 if (chunkLength >>> 24 != 0) {
129 throw new CompressionException("compressed data too large: " + chunkLength);
130 }
131 out.setMediumLE(lengthIdx, chunkLength);
132 }
133
134
135
136
137
138
139
140 private static void writeChunkLength(ByteBuf out, int chunkLength) {
141 out.writeMediumLE(chunkLength);
142 }
143
144
145
146
147
148
149
150 private static void calculateAndWriteChecksum(ByteBuf slice, ByteBuf out) {
151 out.writeIntLE(calculateChecksum(slice));
152 }
153 }