1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty5.handler.codec.http;
17
18 import io.netty5.buffer.api.Buffer;
19 import io.netty5.channel.ChannelHandlerContext;
20 import io.netty5.channel.ChannelOption;
21 import io.netty5.handler.codec.CodecException;
22 import io.netty5.handler.codec.MessageToMessageDecoder;
23 import io.netty5.handler.codec.compression.Decompressor;
24 import io.netty5.util.Resource;
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45 public abstract class HttpContentDecoder extends MessageToMessageDecoder<HttpObject> {
46
47 static final String IDENTITY = HttpHeaderValues.IDENTITY.toString();
48
49 protected ChannelHandlerContext ctx;
50 private Decompressor decompressor;
51 private boolean continueResponse;
52 private boolean needRead = true;
53
54 @Override
55 protected void decodeAndClose(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
56 boolean dispose = true;
57 try {
58 if (msg instanceof HttpResponse && ((HttpResponse) msg).status().code() == 100) {
59
60 if (!(msg instanceof LastHttpContent)) {
61 continueResponse = true;
62 }
63
64 fireChannelRead(ctx, msg);
65 return;
66 }
67
68 if (continueResponse) {
69 if (msg instanceof LastHttpContent) {
70 continueResponse = false;
71 }
72
73 fireChannelRead(ctx, msg);
74 return;
75 }
76
77 if (msg instanceof HttpMessage) {
78 cleanup();
79 final HttpMessage message = (HttpMessage) msg;
80 final HttpHeaders headers = message.headers();
81
82
83 String contentEncoding = headers.get(HttpHeaderNames.CONTENT_ENCODING);
84 if (contentEncoding != null) {
85 contentEncoding = contentEncoding.trim();
86 } else {
87 String transferEncoding = headers.get(HttpHeaderNames.TRANSFER_ENCODING);
88 if (transferEncoding != null) {
89 int idx = transferEncoding.indexOf(",");
90 if (idx != -1) {
91 contentEncoding = transferEncoding.substring(0, idx).trim();
92 } else {
93 contentEncoding = transferEncoding.trim();
94 }
95 } else {
96 contentEncoding = IDENTITY;
97 }
98 }
99 decompressor = newContentDecoder(contentEncoding);
100
101 if (decompressor == null) {
102 dispose = false;
103 fireChannelRead(ctx, message);
104 return;
105 }
106
107
108
109
110
111 if (headers.contains(HttpHeaderNames.CONTENT_LENGTH)) {
112 headers.remove(HttpHeaderNames.CONTENT_LENGTH);
113 headers.set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED);
114 }
115
116
117
118
119 CharSequence targetContentEncoding = getTargetContentEncoding(contentEncoding);
120 if (HttpHeaderValues.IDENTITY.contentEquals(targetContentEncoding)) {
121
122
123 headers.remove(HttpHeaderNames.CONTENT_ENCODING);
124 } else {
125 headers.set(HttpHeaderNames.CONTENT_ENCODING, targetContentEncoding);
126 }
127
128 if (message instanceof HttpContent) {
129
130
131
132
133 HttpMessage copy;
134 if (message instanceof HttpRequest) {
135 HttpRequest r = (HttpRequest) message;
136 copy = new DefaultHttpRequest(r.protocolVersion(), r.method(), r.uri());
137 } else if (message instanceof HttpResponse) {
138 HttpResponse r = (HttpResponse) message;
139 copy = new DefaultHttpResponse(r.protocolVersion(), r.status());
140 } else {
141 throw new CodecException("Object of class " + message.getClass().getName() +
142 " is not an HttpRequest or HttpResponse");
143 }
144 copy.headers().set(message.headers());
145 copy.setDecoderResult(message.decoderResult());
146 fireChannelRead(ctx, copy);
147 } else {
148 fireChannelRead(ctx, message);
149 }
150 }
151
152 if (msg instanceof HttpContent) {
153 final HttpContent<?> c = (HttpContent<?>) msg;
154 if (decompressor == null) {
155 dispose = false;
156 fireChannelRead(ctx, c);
157 } else {
158 decodeContent(ctx, c);
159 }
160 }
161 } finally {
162 if (dispose) {
163 Resource.dispose(msg);
164 }
165 }
166 }
167
168 private void decodeContent(ChannelHandlerContext ctx, HttpContent<?> c) {
169 final Buffer payload = c.payload();
170
171 assert decompressor != null;
172
173 while (!decompressor.isFinished()) {
174 int idx = payload.readerOffset();
175 Buffer decompressed = decompressor.decompress(payload, ctx.bufferAllocator());
176 if (decompressed != null) {
177 if (decompressed.readableBytes() == 0) {
178 decompressed.close();
179 return;
180 }
181 ctx.fireChannelRead(new DefaultHttpContent(decompressed));
182 } else if (idx == payload.readerOffset()) {
183 break;
184 }
185 }
186
187 if (c instanceof LastHttpContent) {
188 decompressor.close();
189 decompressor = null;
190
191 LastHttpContent<?> last = (LastHttpContent<?>) c;
192
193
194 HttpHeaders headers = last.trailingHeaders();
195 if (headers.isEmpty()) {
196 fireChannelRead(ctx, new EmptyLastHttpContent(ctx.bufferAllocator()));
197 } else {
198 fireChannelRead(ctx, new DefaultLastHttpContent(ctx.bufferAllocator().allocate(0), headers));
199 }
200 }
201 }
202
203 @Override
204 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
205 boolean needRead = this.needRead;
206 this.needRead = true;
207
208 try {
209 ctx.fireChannelReadComplete();
210 } finally {
211 if (needRead && !ctx.channel().getOption(ChannelOption.AUTO_READ)) {
212 ctx.read();
213 }
214 }
215 }
216
217
218
219
220
221
222
223
224
225
226 protected abstract Decompressor newContentDecoder(String contentEncoding) throws Exception;
227
228
229
230
231
232
233
234
235
236 protected String getTargetContentEncoding(
237 @SuppressWarnings("UnusedParameters") String contentEncoding) throws Exception {
238 return IDENTITY;
239 }
240
241 @Override
242 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
243 cleanupSafely(ctx);
244 super.handlerRemoved(ctx);
245 }
246
247 @Override
248 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
249 cleanupSafely(ctx);
250 super.channelInactive(ctx);
251 }
252
253 @Override
254 public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
255 this.ctx = ctx;
256 super.handlerAdded(ctx);
257 }
258
259 private void cleanup() {
260 if (decompressor != null) {
261
262 decompressor.close();
263 decompressor = null;
264 }
265 }
266
267 private void cleanupSafely(ChannelHandlerContext ctx) {
268 try {
269 cleanup();
270 } catch (Throwable cause) {
271
272
273 ctx.fireChannelExceptionCaught(cause);
274 }
275 }
276
277 private void fireChannelRead(ChannelHandlerContext ctx, Object msg) {
278 needRead = false;
279 ctx.fireChannelRead(msg);
280 }
281 }