1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty5.handler.codec.http.websocketx.extensions.compression;
17
18 import io.netty5.buffer.api.Buffer;
19 import io.netty5.buffer.api.CompositeBuffer;
20 import io.netty5.util.Send;
21 import io.netty5.channel.ChannelHandlerContext;
22 import io.netty5.channel.embedded.EmbeddedChannel;
23 import io.netty5.handler.codec.CodecException;
24 import io.netty5.handler.codec.compression.ZlibCodecFactory;
25 import io.netty5.handler.codec.compression.ZlibWrapper;
26 import io.netty5.handler.codec.http.websocketx.BinaryWebSocketFrame;
27 import io.netty5.handler.codec.http.websocketx.ContinuationWebSocketFrame;
28 import io.netty5.handler.codec.http.websocketx.TextWebSocketFrame;
29 import io.netty5.handler.codec.http.websocketx.WebSocketFrame;
30 import io.netty5.handler.codec.http.websocketx.extensions.WebSocketExtensionEncoder;
31 import io.netty5.handler.codec.http.websocketx.extensions.WebSocketExtensionFilter;
32
33 import java.util.ArrayList;
34 import java.util.List;
35 import java.util.Objects;
36 import java.util.function.Supplier;
37
38 import static io.netty5.buffer.api.DefaultBufferAllocators.preferredAllocator;
39 import static io.netty5.handler.codec.http.websocketx.extensions.compression.DeflateDecoder.FRAME_TAIL_LENGTH;
40
41
42
43
44
45 abstract class DeflateEncoder extends WebSocketExtensionEncoder {
46 static final Supplier<Buffer> EMPTY_DEFLATE_BLOCK;
47 static final int EMPTY_DEFLATE_BLOCK_LENGTH;
48 static {
49 byte[] emptyDeflate = { 0x00 };
50 EMPTY_DEFLATE_BLOCK = preferredAllocator().constBufferSupplier(emptyDeflate);
51 EMPTY_DEFLATE_BLOCK_LENGTH = emptyDeflate.length;
52 }
53
54 private final int compressionLevel;
55 private final int windowSize;
56 private final boolean noContext;
57 private final WebSocketExtensionFilter extensionEncoderFilter;
58
59 private EmbeddedChannel encoder;
60
61
62
63
64
65
66
67
68 DeflateEncoder(int compressionLevel, int windowSize, boolean noContext,
69 WebSocketExtensionFilter extensionEncoderFilter) {
70 this.compressionLevel = compressionLevel;
71 this.windowSize = windowSize;
72 this.noContext = noContext;
73 this.extensionEncoderFilter = Objects.requireNonNull(extensionEncoderFilter, "extensionEncoderFilter");
74 }
75
76
77
78
79 protected WebSocketExtensionFilter extensionEncoderFilter() {
80 return extensionEncoderFilter;
81 }
82
83
84
85
86
87 protected abstract int rsv(WebSocketFrame msg);
88
89
90
91
92
93 protected abstract boolean removeFrameTail(WebSocketFrame msg);
94
95 @Override
96 protected void encodeAndClose(ChannelHandlerContext ctx, WebSocketFrame msg, List<Object> out) throws Exception {
97 final Buffer compressedContent;
98 if (msg.binaryData().readableBytes() > 0) {
99 compressedContent = compressContent(ctx, msg);
100 } else if (msg.isFinalFragment()) {
101
102
103 compressedContent = EMPTY_DEFLATE_BLOCK.get();
104 } else {
105 msg.close();
106 throw new CodecException("cannot compress content buffer");
107 }
108
109 final WebSocketFrame outMsg;
110 if (msg instanceof TextWebSocketFrame) {
111 outMsg = new TextWebSocketFrame(msg.isFinalFragment(), rsv(msg), compressedContent);
112 } else if (msg instanceof BinaryWebSocketFrame) {
113 outMsg = new BinaryWebSocketFrame(msg.isFinalFragment(), rsv(msg), compressedContent);
114 } else if (msg instanceof ContinuationWebSocketFrame) {
115 outMsg = new ContinuationWebSocketFrame(msg.isFinalFragment(), rsv(msg), compressedContent);
116 } else {
117 compressedContent.close();
118 throw new CodecException("unexpected frame type: " + msg.getClass().getName());
119 }
120
121 out.add(outMsg);
122 }
123
124 @Override
125 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
126 cleanup();
127 super.handlerRemoved(ctx);
128 }
129
130 private Buffer compressContent(ChannelHandlerContext ctx, WebSocketFrame msg) {
131 if (encoder == null) {
132 encoder = new EmbeddedChannel(ZlibCodecFactory.newZlibEncoder(
133 ZlibWrapper.NONE, compressionLevel, windowSize, 8));
134 }
135
136 encoder.writeOutbound(msg.binaryData());
137
138 List<Send<Buffer>> bufferList = new ArrayList<>();
139 for (;;) {
140 Buffer partCompressedContent = encoder.readOutbound();
141 if (partCompressedContent == null) {
142 break;
143 }
144 if (partCompressedContent.readableBytes() == 0) {
145 partCompressedContent.close();
146 continue;
147 }
148 bufferList.add(partCompressedContent.send());
149 }
150
151 if (bufferList.isEmpty()) {
152 throw new CodecException("cannot read compressed buffer");
153 }
154
155 if (msg.isFinalFragment() && noContext) {
156 cleanup();
157 }
158
159 Buffer compressedContent;
160 CompositeBuffer fullCompressedContent = ctx.bufferAllocator().compose(bufferList);
161 if (removeFrameTail(msg)) {
162 int realLength = fullCompressedContent.readableBytes() - FRAME_TAIL_LENGTH;
163 compressedContent = fullCompressedContent.readerOffset(0).writerOffset(realLength);
164 } else {
165 compressedContent = fullCompressedContent;
166 }
167
168 return compressedContent;
169 }
170
171 private void cleanup() {
172 if (encoder != null) {
173
174 encoder.finishAndReleaseAll();
175 encoder = null;
176 }
177 }
178 }