1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package io.netty.handler.codec.http3;
18
19 import io.netty.buffer.ByteBufAllocator;
20 import io.netty.buffer.Unpooled;
21 import io.netty.channel.ChannelHandler;
22 import io.netty.channel.ChannelHandlerContext;
23 import io.netty.channel.ChannelOutboundHandler;
24 import io.netty.channel.ChannelPromise;
25 import io.netty.handler.codec.EncoderException;
26 import io.netty.handler.codec.UnsupportedMessageTypeException;
27 import io.netty.handler.codec.http.DefaultHttpContent;
28 import io.netty.handler.codec.http.DefaultLastHttpContent;
29 import io.netty.handler.codec.http.FullHttpMessage;
30 import io.netty.handler.codec.http.FullHttpResponse;
31 import io.netty.handler.codec.http.HttpContent;
32 import io.netty.handler.codec.http.HttpHeaderNames;
33 import io.netty.handler.codec.http.HttpHeaderValues;
34 import io.netty.handler.codec.http.HttpMessage;
35 import io.netty.handler.codec.http.HttpObject;
36 import io.netty.handler.codec.http.HttpRequest;
37 import io.netty.handler.codec.http.HttpResponse;
38 import io.netty.handler.codec.http.HttpResponseStatus;
39 import io.netty.handler.codec.http.HttpScheme;
40 import io.netty.handler.codec.http.HttpUtil;
41 import io.netty.handler.codec.http.HttpVersion;
42 import io.netty.handler.codec.http.LastHttpContent;
43 import io.netty.handler.codec.quic.QuicStreamChannel;
44 import io.netty.util.concurrent.PromiseCombiner;
45 import org.jetbrains.annotations.Nullable;
46
47 import java.net.SocketAddress;
48
49
50
51
52
53
54
55
56
57
58 public final class Http3FrameToHttpObjectCodec extends Http3RequestStreamInboundHandler
59 implements ChannelOutboundHandler {
60
61 private final boolean isServer;
62 private final boolean validateHeaders;
63 private boolean inboundTranslationInProgress;
64
65 public Http3FrameToHttpObjectCodec(final boolean isServer,
66 final boolean validateHeaders) {
67 this.isServer = isServer;
68 this.validateHeaders = validateHeaders;
69 }
70
71 public Http3FrameToHttpObjectCodec(final boolean isServer) {
72 this(isServer, true);
73 }
74
75 @Override
76 public boolean isSharable() {
77 return false;
78 }
79
80 @Override
81 protected void channelRead(ChannelHandlerContext ctx, Http3HeadersFrame frame) throws Exception {
82 Http3Headers headers = frame.headers();
83 long id = ((QuicStreamChannel) ctx.channel()).streamId();
84
85 final CharSequence status = headers.status();
86
87
88
89 if (null != status && HttpResponseStatus.CONTINUE.codeAsText().contentEquals(status)) {
90 final FullHttpMessage fullMsg = newFullMessage(id, headers, ctx.alloc());
91 ctx.fireChannelRead(fullMsg);
92 return;
93 }
94
95 if (headers.method() == null && status == null) {
96
97 LastHttpContent last = new DefaultLastHttpContent(Unpooled.EMPTY_BUFFER, validateHeaders);
98 HttpConversionUtil.addHttp3ToHttpHeaders(id, headers, last.trailingHeaders(),
99 HttpVersion.HTTP_1_1, true, true);
100 inboundTranslationInProgress = false;
101 ctx.fireChannelRead(last);
102 } else {
103 HttpMessage req = newMessage(id, headers);
104 if (!HttpUtil.isContentLengthSet(req)) {
105 req.headers().add(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED);
106 }
107 inboundTranslationInProgress = true;
108 ctx.fireChannelRead(req);
109 }
110 }
111
112 @Override
113 protected void channelRead(ChannelHandlerContext ctx, Http3DataFrame frame) throws Exception {
114 inboundTranslationInProgress = true;
115 ctx.fireChannelRead(new DefaultHttpContent(frame.content()));
116 }
117
118 @Override
119 protected void channelInputClosed(ChannelHandlerContext ctx) throws Exception {
120 if (inboundTranslationInProgress) {
121 ctx.fireChannelRead(LastHttpContent.EMPTY_LAST_CONTENT);
122 }
123 }
124
125
126
127
128
129
130
131
132
133
134
135 @Override
136 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
137 if (!(msg instanceof HttpObject)) {
138 throw new UnsupportedMessageTypeException(msg, HttpObject.class);
139 }
140
141
142 if (msg instanceof HttpResponse) {
143 final HttpResponse res = (HttpResponse) msg;
144 if (res.status().equals(HttpResponseStatus.CONTINUE)) {
145 if (res instanceof FullHttpResponse) {
146 final Http3Headers headers = toHttp3Headers(res);
147 ctx.write(new DefaultHttp3HeadersFrame(headers), promise);
148 ((FullHttpResponse) res).release();
149 return;
150 } else {
151 throw new EncoderException(
152 HttpResponseStatus.CONTINUE + " must be a FullHttpResponse");
153 }
154 }
155 }
156
157
158 PromiseCombiner combiner = null;
159
160
161
162 boolean isLast = msg instanceof LastHttpContent;
163
164 if (msg instanceof HttpMessage) {
165 Http3Headers headers = toHttp3Headers((HttpMessage) msg);
166 DefaultHttp3HeadersFrame frame = new DefaultHttp3HeadersFrame(headers);
167
168 if (msg instanceof HttpContent && (!promise.isVoid() || isLast)) {
169 combiner = new PromiseCombiner(ctx.executor());
170 }
171 promise = writeWithOptionalCombiner(ctx, frame, promise, combiner, isLast);
172 }
173
174 if (isLast) {
175 LastHttpContent last = (LastHttpContent) msg;
176 try {
177 boolean readable = last.content().isReadable();
178 boolean hasTrailers = !last.trailingHeaders().isEmpty();
179
180 if (combiner == null && readable && hasTrailers && !promise.isVoid()) {
181 combiner = new PromiseCombiner(ctx.executor());
182 }
183
184 if (readable) {
185 promise = writeWithOptionalCombiner(
186 ctx, new DefaultHttp3DataFrame(last.content().retain()), promise, combiner, true);
187 }
188 if (hasTrailers) {
189 Http3Headers headers = HttpConversionUtil.toHttp3Headers(last.trailingHeaders(), validateHeaders);
190 promise = writeWithOptionalCombiner(ctx,
191 new DefaultHttp3HeadersFrame(headers), promise, combiner, true);
192 } else if (!readable) {
193 if (combiner == null) {
194
195 promise = writeWithOptionalCombiner(
196 ctx, new DefaultHttp3DataFrame(last.content().retain()), promise, combiner, true);
197 }
198 }
199
200
201
202 promise = promise.unvoid().addListener(QuicStreamChannel.SHUTDOWN_OUTPUT);
203 } finally {
204
205 last.release();
206 }
207 } else if (msg instanceof HttpContent) {
208 promise = writeWithOptionalCombiner(ctx,
209 new DefaultHttp3DataFrame(((HttpContent) msg).content()), promise, combiner, false);
210 }
211
212 if (combiner != null) {
213 combiner.finish(promise);
214 }
215 }
216
217
218
219
220
221 private static ChannelPromise writeWithOptionalCombiner(
222 ChannelHandlerContext ctx,
223 Object msg,
224 ChannelPromise outerPromise,
225 @Nullable PromiseCombiner combiner,
226 boolean unvoidPromise
227 ) {
228 if (unvoidPromise) {
229 outerPromise = outerPromise.unvoid();
230 }
231 if (combiner == null) {
232 ctx.write(msg, outerPromise);
233 } else {
234 combiner.add(ctx.write(msg));
235 }
236 return outerPromise;
237 }
238
239 private Http3Headers toHttp3Headers(HttpMessage msg) {
240 if (msg instanceof HttpRequest) {
241 msg.headers().set(
242 HttpConversionUtil.ExtensionHeaderNames.SCHEME.text(), HttpScheme.HTTPS);
243 }
244
245 return HttpConversionUtil.toHttp3Headers(msg, validateHeaders);
246 }
247
248 private HttpMessage newMessage(final long id,
249 final Http3Headers headers) throws Http3Exception {
250 return isServer ?
251 HttpConversionUtil.toHttpRequest(id, headers, validateHeaders) :
252 HttpConversionUtil.toHttpResponse(id, headers, validateHeaders);
253 }
254
255 private FullHttpMessage newFullMessage(final long id,
256 final Http3Headers headers,
257 final ByteBufAllocator alloc) throws Http3Exception {
258 return isServer ?
259 HttpConversionUtil.toFullHttpRequest(id, headers, alloc, validateHeaders) :
260 HttpConversionUtil.toFullHttpResponse(id, headers, alloc, validateHeaders);
261 }
262
263 @Override
264 public void flush(ChannelHandlerContext ctx) {
265 ctx.flush();
266 }
267
268 @Override
269 public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
270 ctx.bind(localAddress, promise);
271 }
272
273 @Override
274 public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,
275 SocketAddress localAddress, ChannelPromise promise) {
276 ctx.connect(remoteAddress, localAddress, promise);
277 }
278
279 @Override
280 public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) {
281 ctx.disconnect(promise);
282 }
283
284 @Override
285 public void close(ChannelHandlerContext ctx, ChannelPromise promise) {
286 ctx.close(promise);
287 }
288
289 @Override
290 public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) {
291 ctx.deregister(promise);
292 }
293
294 @Override
295 public void read(ChannelHandlerContext ctx) throws Exception {
296 ctx.read();
297 }
298 }