1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.codec.compression;
17
18 import com.github.luben.zstd.ZstdInputStreamNoFinalizer;
19 import io.netty.buffer.ByteBuf;
20 import io.netty.channel.ChannelHandlerContext;
21 import io.netty.handler.codec.ByteToMessageDecoder;
22
23 import java.io.Closeable;
24 import java.io.IOException;
25 import java.io.InputStream;
26 import java.util.List;
27
28
29
30
31
32 public final class ZstdDecoder extends ByteToMessageDecoder {
33
34 {
35 try {
36 Zstd.ensureAvailability();
37 } catch (Throwable throwable) {
38 throw new ExceptionInInitializerError(throwable);
39 }
40 }
41
42 private final MutableByteBufInputStream inputStream = new MutableByteBufInputStream();
43 private ZstdInputStreamNoFinalizer zstdIs;
44
45 private State currentState = State.DECOMPRESS_DATA;
46
47
48
49
50 private enum State {
51 DECOMPRESS_DATA,
52 CORRUPTED
53 }
54
55 @Override
56 protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
57 try {
58 if (currentState == State.CORRUPTED) {
59 in.skipBytes(in.readableBytes());
60 return;
61 }
62 final int compressedLength = in.readableBytes();
63
64 inputStream.current = in;
65
66 ByteBuf outBuffer = null;
67 try {
68 int w;
69 do {
70 if (outBuffer == null) {
71
72
73 outBuffer = ctx.alloc().heapBuffer(compressedLength * 2);
74 }
75 do {
76 w = outBuffer.writeBytes(zstdIs, outBuffer.writableBytes());
77 } while (w != -1 && outBuffer.isWritable());
78 if (outBuffer.isReadable()) {
79 out.add(outBuffer);
80 outBuffer = null;
81 }
82 } while (w != -1);
83 } finally {
84 if (outBuffer != null) {
85 outBuffer.release();
86 }
87 }
88 } catch (Exception e) {
89 currentState = State.CORRUPTED;
90 throw new DecompressionException(e);
91 } finally {
92 inputStream.current = null;
93 }
94 }
95
96 @Override
97 public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
98 super.handlerAdded(ctx);
99 zstdIs = new ZstdInputStreamNoFinalizer(inputStream);
100 zstdIs.setContinuous(true);
101 }
102
103 @Override
104 protected void handlerRemoved0(ChannelHandlerContext ctx) throws Exception {
105 try {
106 closeSilently(zstdIs);
107 } finally {
108 super.handlerRemoved0(ctx);
109 }
110 }
111
112 private static void closeSilently(Closeable closeable) {
113 if (closeable != null) {
114 try {
115 closeable.close();
116 } catch (IOException ignore) {
117
118 }
119 }
120 }
121
122 private static final class MutableByteBufInputStream extends InputStream {
123 ByteBuf current;
124
125 @Override
126 public int read() {
127 if (current == null || !current.isReadable()) {
128 return -1;
129 }
130 return current.readByte() & 0xff;
131 }
132
133 @Override
134 public int read(byte[] b, int off, int len) {
135 int available = available();
136 if (available == 0) {
137 return -1;
138 }
139
140 len = Math.min(available, len);
141 current.readBytes(b, off, len);
142 return len;
143 }
144
145 @Override
146 public int available() {
147 return current == null ? 0 : current.readableBytes();
148 }
149 }
150 }