1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.codec.compression;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufUtil;
20 import io.netty.channel.ChannelHandlerContext;
21 import io.netty.handler.codec.MessageToByteEncoder;
22
23 import static io.netty.handler.codec.compression.Snappy.*;
24
25
26
27
28
29
30 public class SnappyFramedEncoder extends MessageToByteEncoder<ByteBuf> {
31
32
33
34
35
36 private static final int MIN_COMPRESSIBLE_LENGTH = 18;
37
38
39
40
41
42 private static final byte[] STREAM_START = {
43 (byte) 0xff, 0x06, 0x00, 0x00, 0x73, 0x4e, 0x61, 0x50, 0x70, 0x59
44 };
45
46 private final Snappy snappy = new Snappy();
47 private boolean started;
48
49 @Override
50 protected void encode(ChannelHandlerContext ctx, ByteBuf in, ByteBuf out) throws Exception {
51 if (!in.isReadable()) {
52 return;
53 }
54
55 if (!started) {
56 started = true;
57 out.writeBytes(STREAM_START);
58 }
59
60 int dataLength = in.readableBytes();
61 if (dataLength > MIN_COMPRESSIBLE_LENGTH) {
62 for (;;) {
63 final int lengthIdx = out.writerIndex() + 1;
64 if (dataLength < MIN_COMPRESSIBLE_LENGTH) {
65 ByteBuf slice = in.readSlice(dataLength);
66 writeUnencodedChunk(slice, out, dataLength);
67 break;
68 }
69
70 out.writeInt(0);
71 if (dataLength > Short.MAX_VALUE) {
72 ByteBuf slice = in.readSlice(Short.MAX_VALUE);
73 calculateAndWriteChecksum(slice, out);
74 snappy.encode(slice, out, Short.MAX_VALUE);
75 setChunkLength(out, lengthIdx);
76 dataLength -= Short.MAX_VALUE;
77 } else {
78 ByteBuf slice = in.readSlice(dataLength);
79 calculateAndWriteChecksum(slice, out);
80 snappy.encode(slice, out, dataLength);
81 setChunkLength(out, lengthIdx);
82 break;
83 }
84 }
85 } else {
86 writeUnencodedChunk(in, out, dataLength);
87 }
88 }
89
90 private static void writeUnencodedChunk(ByteBuf in, ByteBuf out, int dataLength) {
91 out.writeByte(1);
92 writeChunkLength(out, dataLength + 4);
93 calculateAndWriteChecksum(in, out);
94 out.writeBytes(in, dataLength);
95 }
96
97 private static void setChunkLength(ByteBuf out, int lengthIdx) {
98 int chunkLength = out.writerIndex() - lengthIdx - 3;
99 if (chunkLength >>> 24 != 0) {
100 throw new CompressionException("compressed data too large: " + chunkLength);
101 }
102 out.setMedium(lengthIdx, ByteBufUtil.swapMedium(chunkLength));
103 }
104
105
106
107
108
109
110
111 private static void writeChunkLength(ByteBuf out, int chunkLength) {
112 out.writeMedium(ByteBufUtil.swapMedium(chunkLength));
113 }
114
115
116
117
118
119
120
121 private static void calculateAndWriteChecksum(ByteBuf slice, ByteBuf out) {
122 out.writeInt(ByteBufUtil.swapInt(calculateChecksum(slice)));
123 }
124 }