1 /*
2 * Copyright 2019 The Netty Project
3 *
4 * The Netty Project licenses this file to you under the Apache License,
5 * version 2.0 (the "License"); you may not use this file except in compliance
6 * with the License. You may obtain a copy of the License at:
7 *
8 * https://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations
14 * under the License.
15 */
16 package io.netty5.handler.codec.http2;
17
18 import io.netty5.buffer.api.Buffer;
19 import io.netty5.channel.Channel;
20 import io.netty5.channel.ChannelHandler;
21 import io.netty5.channel.ChannelHandlerContext;
22 import io.netty5.channel.ChannelOption;
23 import io.netty5.channel.ChannelPipeline;
24 import io.netty5.channel.EventLoop;
25 import io.netty5.channel.ServerChannel;
26 import io.netty5.handler.codec.http2.Http2FrameCodec.DefaultHttp2FrameStream;
27 import io.netty5.util.Resource;
28 import io.netty5.util.concurrent.Future;
29 import io.netty5.util.concurrent.FutureContextListener;
30 import io.netty5.util.internal.UnstableApi;
31
32 import javax.net.ssl.SSLException;
33 import java.util.ArrayDeque;
34 import java.util.Objects;
35 import java.util.Queue;
36
37 import static io.netty5.handler.codec.http2.Http2Error.INTERNAL_ERROR;
38 import static io.netty5.handler.codec.http2.Http2Exception.connectionError;
39
40 /**
41 * An HTTP/2 handler that creates child channels for each stream. This handler must be used in combination
42 * with {@link Http2FrameCodec}.
43 *
44 * <p>When a new stream is created, a new {@link Channel} is created for it. Applications send and
45 * receive {@link Http2StreamFrame}s on the created channel. {@link Buffer}s cannot be processed by the channel;
46 * all writes that reach the head of the pipeline must be an instance of {@link Http2StreamFrame}. Writes that reach
47 * the head of the pipeline are processed directly by this handler and cannot be intercepted.
48 *
49 * <p>The child channel will be notified of user events that impact the stream, such as {@link
50 * Http2GoAwayFrame} and {@link Http2ResetFrame}, as soon as they occur. Although {@code
51 * Http2GoAwayFrame} and {@code Http2ResetFrame} signify that the remote is ignoring further
52 * communication, closing of the channel is delayed until any inbound queue is drained with {@link
53 * Channel#read()}, which follows the default behavior of channels in Netty. Applications are
54 * free to close the channel in response to such events if they don't have use for any queued
55 * messages. Any connection level events like {@link Http2SettingsFrame} and {@link Http2GoAwayFrame}
56 * will be processed internally and also propagated down the pipeline for other handlers to act on.
57 *
58 * <p>Outbound streams are supported via the {@link Http2StreamChannelBootstrap}.
59 *
60 * <p>{@link ChannelOption#MAX_MESSAGES_PER_READ} and {@link ChannelOption#AUTO_READ} are supported.
61 *
62 * <h3>Resources</h3>
63 *
64 * Some {@link Http2StreamFrame}s implement the {@link Resource} interface, as they carry
65 * resource objects (e.g. {@link Buffer}s). An application handler needs to close or dispose of
66 * such objects after having consumed them.
67 *
68 * <h3>Channel Events</h3>
69 *
70 * A child channel becomes active as soon as it is registered to an {@link EventLoop}. Therefore, an active channel
71 * does not map to an active HTTP/2 stream immediately. Only once a {@link Http2HeadersFrame} has been successfully sent
72 * or received, does the channel map to an active HTTP/2 stream. In case it is not possible to open a new HTTP/2 stream
73 * (i.e. due to the maximum number of active streams being exceeded), the child channel receives an exception
74 * indicating the cause and is closed immediately thereafter.
75 *
76 * <h3>Writability and Flow Control</h3>
77 *
78 * A child channel observes outbound/remote flow control via the channel's writability. A channel only becomes writable
79 * when it maps to an active HTTP/2 stream . A child channel does not know about the connection-level flow control
80 * window. {@link ChannelHandler}s are free to ignore the channel's writability, in which case the excessive writes will
81 * be buffered by the parent channel. It's important to note that only {@link Http2DataFrame}s are subject to
82 * HTTP/2 flow control.
83 */
84 @UnstableApi
85 public final class Http2MultiplexHandler extends Http2ChannelDuplexHandler {
86
87 private static final FutureContextListener<Channel, Void> CHILD_CHANNEL_REGISTRATION_LISTENER =
88 Http2MultiplexHandler::registerDone;
89
90 private final ChannelHandler inboundStreamHandler;
91 private final ChannelHandler upgradeStreamHandler;
92 private final Queue<DefaultHttp2StreamChannel> readCompletePendingQueue =
93 new MaxCapacityQueue<>(new ArrayDeque<>(8),
94 // Choose 100 which is what is used most of the times as default.
95 Http2CodecUtil.SMALLEST_MAX_CONCURRENT_STREAMS);
96
97 private boolean parentReadInProgress;
98 private int idCount;
99
100 // Need to be volatile as accessed from within the Http2MultiplexHandlerStreamChannel in a multi-threaded fashion.
101 private volatile ChannelHandlerContext ctx;
102
103 /**
104 * Creates a new instance
105 *
106 * @param inboundStreamHandler the {@link ChannelHandler} that will be added to the {@link ChannelPipeline} of
107 * the {@link Channel}s created for new inbound streams.
108 */
109 public Http2MultiplexHandler(ChannelHandler inboundStreamHandler) {
110 this(inboundStreamHandler, null);
111 }
112
113 /**
114 * Creates a new instance
115 *
116 * @param inboundStreamHandler the {@link ChannelHandler} that will be added to the {@link ChannelPipeline} of
117 * the {@link Channel}s created for new inbound streams.
118 * @param upgradeStreamHandler the {@link ChannelHandler} that will be added to the {@link ChannelPipeline} of the
119 * upgraded {@link Channel}.
120 */
121 public Http2MultiplexHandler(ChannelHandler inboundStreamHandler, ChannelHandler upgradeStreamHandler) {
122 this.inboundStreamHandler = Objects.requireNonNull(inboundStreamHandler, "inboundStreamHandler");
123 this.upgradeStreamHandler = upgradeStreamHandler;
124 }
125
126 private static void registerDone(Channel childChannel, Future<?> future) {
127 // Handle any errors that occurred on the local thread while registering. Even though
128 // failures can happen after this point, they will be handled by the channel by closing the
129 // childChannel.
130 if (future.isFailed()) {
131 if (childChannel.isRegistered()) {
132 childChannel.close();
133 } else {
134 ((DefaultHttp2StreamChannel) childChannel).closeForcibly();
135 }
136 }
137 }
138
139 @Override
140 protected void handlerAdded0(ChannelHandlerContext ctx) {
141 if (ctx.executor().inEventLoop() != ctx.channel().executor().inEventLoop()) {
142 throw new IllegalStateException("EventExecutor must be on the same thread as the EventLoop of the Channel");
143 }
144 this.ctx = ctx;
145 }
146
147 @Override
148 protected void handlerRemoved0(ChannelHandlerContext ctx) {
149 readCompletePendingQueue.clear();
150 }
151
152 @Override
153 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
154 parentReadInProgress = true;
155 if (msg instanceof Http2StreamFrame) {
156 if (msg instanceof Http2WindowUpdateFrame) {
157 // We dont want to propagate update frames to the user
158 return;
159 }
160 Http2StreamFrame streamFrame = (Http2StreamFrame) msg;
161 DefaultHttp2FrameStream s =
162 (DefaultHttp2FrameStream) streamFrame.stream();
163
164 DefaultHttp2StreamChannel channel = (DefaultHttp2StreamChannel) s.attachment;
165 if (msg instanceof Http2ResetFrame) {
166 // Reset frames needs to be propagated via user events as these are not flow-controlled and so
167 // must not be controlled by suppressing channel.read() on the child channel.
168 channel.pipeline().fireChannelInboundEvent(msg);
169
170 // RST frames will also trigger closing of the streams which then will call
171 // AbstractHttp2StreamChannel.streamClosed()
172 } else {
173 channel.fireChildRead(streamFrame);
174 }
175 return;
176 }
177
178 if (msg instanceof Http2GoAwayFrame) {
179 // goaway frames will also trigger closing of the streams which then will call
180 // AbstractHttp2StreamChannel.streamClosed()
181 onHttp2GoAwayFrame(ctx, (Http2GoAwayFrame) msg);
182 }
183
184 // Send everything down the pipeline
185 ctx.fireChannelRead(msg);
186 }
187
188 @Override
189 public void channelWritabilityChanged(final ChannelHandlerContext ctx) throws Exception {
190 if (ctx.channel().isWritable()) {
191 // While the writability state may change during iterating of the streams we just set all of the streams
192 // to writable to not affect fairness. These will be "limited" by their own watermarks in any case.
193 forEachActiveStream(DefaultHttp2StreamChannel.WRITABLE_VISITOR);
194 }
195
196 ctx.fireChannelWritabilityChanged();
197 }
198
199 @Override
200 public void channelInboundEvent(ChannelHandlerContext ctx, Object evt) throws Exception {
201 if (evt instanceof Http2FrameStreamEvent) {
202 Http2FrameStreamEvent event = (Http2FrameStreamEvent) evt;
203 DefaultHttp2FrameStream stream = (DefaultHttp2FrameStream) event.stream();
204 if (event.type() == Http2FrameStreamEvent.Type.State) {
205 switch (stream.state()) {
206 case HALF_CLOSED_LOCAL:
207 // Ignore everything which was not caused by an upgrade
208 if (stream.id() != Http2CodecUtil.HTTP_UPGRADE_STREAM_ID) {
209 break;
210 }
211 // fall-through
212 case HALF_CLOSED_REMOTE:
213 // fall-through
214 case OPEN:
215 createStreamChannelIfNeeded(stream);
216 break;
217 case CLOSED:
218 DefaultHttp2StreamChannel channel = (DefaultHttp2StreamChannel) stream.attachment;
219 if (channel != null) {
220 channel.streamClosed();
221 }
222 break;
223 default:
224 // ignore for now
225 break;
226 }
227 }
228 return;
229 }
230 ctx.fireChannelInboundEvent(evt);
231 }
232
233 private void createStreamChannelIfNeeded(DefaultHttp2FrameStream stream)
234 throws Http2Exception {
235 if (stream.attachment != null) {
236 // ignore if child channel was already created.
237 return;
238 }
239 final DefaultHttp2StreamChannel ch;
240 // We need to handle upgrades special when on the client side.
241 if (stream.id() == Http2CodecUtil.HTTP_UPGRADE_STREAM_ID && !isServer(ctx)) {
242 // We must have an upgrade handler or else we can't handle the stream
243 if (upgradeStreamHandler == null) {
244 throw connectionError(INTERNAL_ERROR,
245 "Client is misconfigured for upgrade requests");
246 }
247 ch = new DefaultHttp2StreamChannel(this, stream, ++idCount, upgradeStreamHandler);
248 ch.closeOutbound();
249 } else {
250 ch = new DefaultHttp2StreamChannel(this, stream, ++idCount, inboundStreamHandler);
251 }
252 Future<Void> future = ch.register();
253 if (future.isDone()) {
254 registerDone(ch, future);
255 } else {
256 future.addListener(ch, CHILD_CHANNEL_REGISTRATION_LISTENER);
257 }
258 }
259
260 // TODO: This is most likely not the best way to expose this, need to think more about it.
261 Http2StreamChannel newOutboundStream() {
262 return new DefaultHttp2StreamChannel(this, (DefaultHttp2FrameStream) newStream(), ++idCount, null);
263 }
264
265 @Override
266 public void channelExceptionCaught(ChannelHandlerContext ctx, final Throwable cause) throws Exception {
267 if (cause instanceof Http2FrameStreamException) {
268 Http2FrameStreamException exception = (Http2FrameStreamException) cause;
269 Http2FrameStream stream = exception.stream();
270 DefaultHttp2StreamChannel childChannel = (DefaultHttp2StreamChannel)
271 ((DefaultHttp2FrameStream) stream).attachment;
272 try {
273 childChannel.pipeline().fireChannelExceptionCaught(cause.getCause());
274 } finally {
275 childChannel.closeForcibly();
276 }
277 return;
278 }
279 if (cause.getCause() instanceof SSLException) {
280 forEachActiveStream(stream -> {
281 DefaultHttp2StreamChannel childChannel = (DefaultHttp2StreamChannel)
282 ((DefaultHttp2FrameStream) stream).attachment;
283 childChannel.pipeline().fireChannelExceptionCaught(cause);
284 return true;
285 });
286 }
287 ctx.fireChannelExceptionCaught(cause);
288 }
289
290 private static boolean isServer(ChannelHandlerContext ctx) {
291 return ctx.channel().parent() instanceof ServerChannel;
292 }
293
294 private void onHttp2GoAwayFrame(ChannelHandlerContext ctx, final Http2GoAwayFrame goAwayFrame) {
295 if (goAwayFrame.lastStreamId() == Integer.MAX_VALUE) {
296 // None of the streams can have an id greater than Integer.MAX_VALUE
297 return;
298 }
299 // Notify which streams were not processed by the remote peer and are safe to retry on another connection:
300 try {
301 final boolean server = isServer(ctx);
302 forEachActiveStream(stream -> {
303 final int streamId = stream.id();
304 if (streamId > goAwayFrame.lastStreamId() && Http2CodecUtil.isStreamIdValid(streamId, server)) {
305 final DefaultHttp2StreamChannel childChannel = (DefaultHttp2StreamChannel)
306 ((DefaultHttp2FrameStream) stream).attachment;
307 childChannel.pipeline().fireChannelInboundEvent(goAwayFrame.copy());
308 }
309 return true;
310 });
311 } catch (Http2Exception e) {
312 ctx.fireChannelExceptionCaught(e);
313 ctx.close();
314 }
315 }
316
317 /**
318 * Notifies any child streams of the read completion.
319 */
320 @Override
321 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
322 processPendingReadCompleteQueue();
323 ctx.fireChannelReadComplete();
324 }
325
326 private void processPendingReadCompleteQueue() {
327 parentReadInProgress = true;
328 // If we have many child channel we can optimize for the case when multiple call flush() in
329 // channelReadComplete(...) callbacks and only do it once as otherwise we will end-up with multiple
330 // write calls on the socket which is expensive.
331 DefaultHttp2StreamChannel childChannel = readCompletePendingQueue.poll();
332 if (childChannel != null) {
333 try {
334 do {
335 childChannel.fireChildReadComplete();
336 childChannel = readCompletePendingQueue.poll();
337 } while (childChannel != null);
338 } finally {
339 parentReadInProgress = false;
340 readCompletePendingQueue.clear();
341 ctx.flush();
342 }
343 } else {
344 parentReadInProgress = false;
345 }
346 }
347
348 boolean isParentReadInProgress() {
349 return parentReadInProgress;
350 }
351
352 void addChannelToReadCompletePendingQueue(DefaultHttp2StreamChannel channel) {
353 // If there is no space left in the queue, just keep on processing everything that is already
354 // stored there and try again.
355 while (!readCompletePendingQueue.offer(channel)) {
356 processPendingReadCompleteQueue();
357 }
358 }
359
360 ChannelHandlerContext parentContext() {
361 return ctx;
362 }
363 }