1 /*
2 * Copyright 2019 The Netty Project
3 *
4 * The Netty Project licenses this file to you under the Apache License,
5 * version 2.0 (the "License"); you may not use this file except in compliance
6 * with the License. You may obtain a copy of the License at:
7 *
8 * https://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations
14 * under the License.
15 */
16 package io.netty.handler.codec.http2;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.channel.Channel;
20 import io.netty.channel.ChannelConfig;
21 import io.netty.channel.ChannelFuture;
22 import io.netty.channel.ChannelFutureListener;
23 import io.netty.channel.ChannelHandler;
24 import io.netty.channel.ChannelHandlerContext;
25 import io.netty.channel.ChannelPipeline;
26 import io.netty.channel.EventLoop;
27 import io.netty.channel.ServerChannel;
28 import io.netty.channel.socket.ChannelInputShutdownReadComplete;
29 import io.netty.channel.socket.ChannelOutputShutdownEvent;
30 import io.netty.handler.codec.http2.Http2FrameCodec.DefaultHttp2FrameStream;
31 import io.netty.handler.ssl.SslCloseCompletionEvent;
32 import io.netty.util.ReferenceCounted;
33 import io.netty.util.internal.ObjectUtil;
34
35 import java.util.ArrayDeque;
36 import java.util.Queue;
37 import javax.net.ssl.SSLException;
38
39 import static io.netty.handler.codec.http2.AbstractHttp2StreamChannel.CHANNEL_INPUT_SHUTDOWN_READ_COMPLETE_VISITOR;
40 import static io.netty.handler.codec.http2.AbstractHttp2StreamChannel.CHANNEL_OUTPUT_SHUTDOWN_EVENT_VISITOR;
41 import static io.netty.handler.codec.http2.AbstractHttp2StreamChannel.SSL_CLOSE_COMPLETION_EVENT_VISITOR;
42 import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
43 import static io.netty.handler.codec.http2.Http2Exception.connectionError;
44
45 /**
46 * An HTTP/2 handler that creates child channels for each stream. This handler must be used in combination
47 * with {@link Http2FrameCodec}.
48 *
49 * <p>When a new stream is created, a new {@link Http2StreamChannel} is created for it. Applications send and
50 * receive {@link Http2StreamFrame}s on the created channel. {@link ByteBuf}s cannot be processed by the channel;
51 * all writes that reach the head of the pipeline must be an instance of {@link Http2StreamFrame}. Writes that reach
52 * the head of the pipeline are processed directly by this handler and cannot be intercepted.
53 *
54 * <p>The child channel will be notified of user events that impact the stream, such as {@link
55 * Http2GoAwayFrame} and {@link Http2ResetFrame}, as soon as they occur. Although {@code
56 * Http2GoAwayFrame} and {@code Http2ResetFrame} signify that the remote is ignoring further
57 * communication, closing of the channel is delayed until any inbound queue is drained with {@link
58 * Channel#read()}, which follows the default behavior of channels in Netty. Applications are
59 * free to close the channel in response to such events if they don't have use for any queued
60 * messages. Any connection level events like {@link Http2SettingsFrame} and {@link Http2GoAwayFrame}
61 * will be processed internally and also propagated down the pipeline for other handlers to act on.
62 *
63 * <p>Outbound streams are supported via the {@link Http2StreamChannelBootstrap}.
64 *
65 * <p>{@link ChannelConfig#setMaxMessagesPerRead(int)} and {@link ChannelConfig#setAutoRead(boolean)} are supported.
66 *
67 * <h3>Reference Counting</h3>
68 *
69 * Some {@link Http2StreamFrame}s implement the {@link ReferenceCounted} interface, as they carry
70 * reference counted objects (e.g. {@link ByteBuf}s). The multiplex codec will call {@link ReferenceCounted#retain()}
71 * before propagating a reference counted object through the pipeline, and thus an application handler needs to release
72 * such an object after having consumed it. For more information on reference counting take a look at
73 * <a href="https://netty.io/wiki/reference-counted-objects.html">the reference counted docs.</a>
74 *
75 * <h3>Channel Events</h3>
76 *
77 * A child channel becomes active as soon as it is registered to an {@link EventLoop}. Therefore, an active channel
78 * does not map to an active HTTP/2 stream immediately. Only once a {@link Http2HeadersFrame} has been successfully sent
79 * or received, does the channel map to an active HTTP/2 stream. In case it is not possible to open a new HTTP/2 stream
80 * (i.e. due to the maximum number of active streams being exceeded), the child channel receives an exception
81 * indicating the cause and is closed immediately thereafter.
82 *
83 * <h3>Writability and Flow Control</h3>
84 *
85 * A child channel observes outbound/remote flow control via the channel's writability. A channel only becomes writable
86 * when it maps to an active HTTP/2 stream . A child channel does not know about the connection-level flow control
87 * window. {@link ChannelHandler}s are free to ignore the channel's writability, in which case the excessive writes will
88 * be buffered by the parent channel. It's important to note that only {@link Http2DataFrame}s are subject to
89 * HTTP/2 flow control.
90 *
91 * <h3>Closing a {@link Http2StreamChannel}</h3>
92 *
93 * Once you close a {@link Http2StreamChannel} a {@link Http2ResetFrame} will be sent to the remote peer with
94 * {@link Http2Error#CANCEL} if needed. If you want to close the stream with another {@link Http2Error} (due
95 * errors / limits) you should propagate a {@link Http2FrameStreamException} through the {@link ChannelPipeline}.
96 * Once it reaches the end of the {@link ChannelPipeline} it will automatically close the {@link Http2StreamChannel}
97 * and send a {@link Http2ResetFrame} with the unwrapped {@link Http2Error} set. Another possibility is to just
98 * directly write a {@link Http2ResetFrame} to the {@link Http2StreamChannel}l.
99 */
100 public final class Http2MultiplexHandler extends Http2ChannelDuplexHandler {
101
102 static final ChannelFutureListener CHILD_CHANNEL_REGISTRATION_LISTENER = Http2MultiplexHandler::registerDone;
103
104 private final ChannelHandler inboundStreamHandler;
105 private final ChannelHandler upgradeStreamHandler;
106 private final Queue<AbstractHttp2StreamChannel> readCompletePendingQueue =
107 new MaxCapacityQueue<AbstractHttp2StreamChannel>(new ArrayDeque<AbstractHttp2StreamChannel>(8),
108 // Choose 100 which is what is used most of the times as default.
109 Http2CodecUtil.SMALLEST_MAX_CONCURRENT_STREAMS);
110
111 private boolean parentReadInProgress;
112 private int idCount;
113
114 // Need to be volatile as accessed from within the Http2MultiplexHandlerStreamChannel in a multi-threaded fashion.
115 private volatile ChannelHandlerContext ctx;
116
117 /**
118 * Creates a new instance
119 *
120 * @param inboundStreamHandler the {@link ChannelHandler} that will be added to the {@link ChannelPipeline} of
121 * the {@link Channel}s created for new inbound streams.
122 */
123 public Http2MultiplexHandler(ChannelHandler inboundStreamHandler) {
124 this(inboundStreamHandler, null);
125 }
126
127 /**
128 * Creates a new instance
129 *
130 * @param inboundStreamHandler the {@link ChannelHandler} that will be added to the {@link ChannelPipeline} of
131 * the {@link Channel}s created for new inbound streams.
132 * @param upgradeStreamHandler the {@link ChannelHandler} that will be added to the {@link ChannelPipeline} of the
133 * upgraded {@link Channel}.
134 */
135 public Http2MultiplexHandler(ChannelHandler inboundStreamHandler, ChannelHandler upgradeStreamHandler) {
136 this.inboundStreamHandler = ObjectUtil.checkNotNull(inboundStreamHandler, "inboundStreamHandler");
137 this.upgradeStreamHandler = upgradeStreamHandler;
138 }
139
140 static void registerDone(ChannelFuture future) {
141 // Handle any errors that occurred on the local thread while registering. Even though
142 // failures can happen after this point, they will be handled by the channel by closing the
143 // childChannel.
144 if (!future.isSuccess()) {
145 Channel childChannel = future.channel();
146 if (childChannel.isRegistered()) {
147 childChannel.close();
148 } else {
149 childChannel.unsafe().closeForcibly();
150 }
151 }
152 }
153
154 @Override
155 protected void handlerAdded0(ChannelHandlerContext ctx) {
156 if (ctx.executor() != ctx.channel().eventLoop()) {
157 throw new IllegalStateException("EventExecutor must be EventLoop of Channel");
158 }
159 this.ctx = ctx;
160 }
161
162 @Override
163 protected void handlerRemoved0(ChannelHandlerContext ctx) {
164 readCompletePendingQueue.clear();
165 }
166
167 @Override
168 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
169 parentReadInProgress = true;
170 if (msg instanceof Http2StreamFrame) {
171 if (msg instanceof Http2WindowUpdateFrame) {
172 // We dont want to propagate update frames to the user
173 return;
174 }
175 Http2StreamFrame streamFrame = (Http2StreamFrame) msg;
176 DefaultHttp2FrameStream s =
177 (DefaultHttp2FrameStream) streamFrame.stream();
178
179 AbstractHttp2StreamChannel channel = (AbstractHttp2StreamChannel) s.attachment;
180 if (msg instanceof Http2ResetFrame || msg instanceof Http2PriorityFrame) {
181 // Reset and Priority frames needs to be propagated via user events as these are not flow-controlled and
182 // so must not be controlled by suppressing channel.read() on the child channel.
183 channel.pipeline().fireUserEventTriggered(msg);
184
185 // RST frames will also trigger closing of the streams which then will call
186 // AbstractHttp2StreamChannel.streamClosed()
187 } else {
188 channel.fireChildRead(streamFrame);
189 }
190 return;
191 }
192
193 if (msg instanceof Http2GoAwayFrame) {
194 // goaway frames will also trigger closing of the streams which then will call
195 // AbstractHttp2StreamChannel.streamClosed()
196 onHttp2GoAwayFrame(ctx, (Http2GoAwayFrame) msg);
197 }
198
199 // Send everything down the pipeline
200 ctx.fireChannelRead(msg);
201 }
202
203 @Override
204 public void channelWritabilityChanged(final ChannelHandlerContext ctx) throws Exception {
205 if (ctx.channel().isWritable()) {
206 // While the writability state may change during iterating of the streams we just set all of the streams
207 // to writable to not affect fairness. These will be "limited" by their own watermarks in any case.
208 forEachActiveStream(AbstractHttp2StreamChannel.WRITABLE_VISITOR);
209 }
210
211 ctx.fireChannelWritabilityChanged();
212 }
213
214 @Override
215 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
216 if (evt instanceof Http2FrameStreamEvent) {
217 Http2FrameStreamEvent event = (Http2FrameStreamEvent) evt;
218 DefaultHttp2FrameStream stream = (DefaultHttp2FrameStream) event.stream();
219 if (event.type() == Http2FrameStreamEvent.Type.State) {
220 switch (stream.state()) {
221 case HALF_CLOSED_LOCAL:
222 if (stream.id() != Http2CodecUtil.HTTP_UPGRADE_STREAM_ID) {
223 // Ignore everything which was not caused by an upgrade
224 break;
225 }
226 // fall-through
227 case HALF_CLOSED_REMOTE:
228 // fall-through
229 case OPEN:
230 if (stream.attachment != null) {
231 // ignore if child channel was already created.
232 break;
233 }
234 final AbstractHttp2StreamChannel ch;
235 // We need to handle upgrades special when on the client side.
236 if (stream.id() == Http2CodecUtil.HTTP_UPGRADE_STREAM_ID && !isServer(ctx)) {
237 // We must have an upgrade handler or else we can't handle the stream
238 if (upgradeStreamHandler == null) {
239 throw connectionError(INTERNAL_ERROR,
240 "Client is misconfigured for upgrade requests");
241 }
242 ch = new Http2MultiplexHandlerStreamChannel(stream, upgradeStreamHandler);
243 ch.closeOutbound();
244 } else {
245 ch = new Http2MultiplexHandlerStreamChannel(stream, inboundStreamHandler);
246 }
247 ChannelFuture future = ctx.channel().eventLoop().register(ch);
248 if (future.isDone()) {
249 registerDone(future);
250 } else {
251 future.addListener(CHILD_CHANNEL_REGISTRATION_LISTENER);
252 }
253 break;
254 case CLOSED:
255 AbstractHttp2StreamChannel channel = (AbstractHttp2StreamChannel) stream.attachment;
256 if (channel != null) {
257 channel.streamClosed();
258 }
259 break;
260 default:
261 // ignore for now
262 break;
263 }
264 }
265 return;
266 }
267 if (evt == ChannelInputShutdownReadComplete.INSTANCE) {
268 forEachActiveStream(CHANNEL_INPUT_SHUTDOWN_READ_COMPLETE_VISITOR);
269 } else if (evt == ChannelOutputShutdownEvent.INSTANCE) {
270 forEachActiveStream(CHANNEL_OUTPUT_SHUTDOWN_EVENT_VISITOR);
271 } else if (evt == SslCloseCompletionEvent.SUCCESS) {
272 forEachActiveStream(SSL_CLOSE_COMPLETION_EVENT_VISITOR);
273 }
274 ctx.fireUserEventTriggered(evt);
275 }
276
277 // TODO: This is most likely not the best way to expose this, need to think more about it.
278 Http2StreamChannel newOutboundStream() {
279 return new Http2MultiplexHandlerStreamChannel((DefaultHttp2FrameStream) newStream(), null);
280 }
281
282 @Override
283 public void exceptionCaught(ChannelHandlerContext ctx, final Throwable cause) throws Exception {
284 if (cause instanceof Http2FrameStreamException) {
285 Http2FrameStreamException exception = (Http2FrameStreamException) cause;
286 Http2FrameStream stream = exception.stream();
287 AbstractHttp2StreamChannel childChannel = (AbstractHttp2StreamChannel)
288 ((DefaultHttp2FrameStream) stream).attachment;
289 try {
290 childChannel.pipeline().fireExceptionCaught(cause.getCause());
291 } finally {
292 // Close with the correct error that causes this stream exception.
293 // See https://github.com/netty/netty/issues/13235#issuecomment-1441994672
294 childChannel.closeWithError(exception.error());
295 }
296 return;
297 }
298 if (cause instanceof Http2MultiplexActiveStreamsException) {
299 // Unwrap the cause that was used to create it and fire it for all the active streams.
300 fireExceptionCaughtForActiveStream(cause.getCause());
301 return;
302 }
303
304 if (cause.getCause() instanceof SSLException) {
305 fireExceptionCaughtForActiveStream(cause);
306 }
307 ctx.fireExceptionCaught(cause);
308 }
309
310 private void fireExceptionCaughtForActiveStream(final Throwable cause) throws Http2Exception {
311 forEachActiveStream(new Http2FrameStreamVisitor() {
312 @Override
313 public boolean visit(Http2FrameStream stream) {
314 AbstractHttp2StreamChannel childChannel = (AbstractHttp2StreamChannel)
315 ((DefaultHttp2FrameStream) stream).attachment;
316 childChannel.pipeline().fireExceptionCaught(cause);
317 return true;
318 }
319 });
320 }
321
322 private static boolean isServer(ChannelHandlerContext ctx) {
323 return ctx.channel().parent() instanceof ServerChannel;
324 }
325
326 private void onHttp2GoAwayFrame(ChannelHandlerContext ctx, final Http2GoAwayFrame goAwayFrame) {
327 if (goAwayFrame.lastStreamId() == Integer.MAX_VALUE) {
328 // None of the streams can have an id greater than Integer.MAX_VALUE
329 return;
330 }
331 // Notify which streams were not processed by the remote peer and are safe to retry on another connection:
332 try {
333 final boolean server = isServer(ctx);
334 forEachActiveStream(new Http2FrameStreamVisitor() {
335 @Override
336 public boolean visit(Http2FrameStream stream) {
337 final int streamId = stream.id();
338 if (streamId > goAwayFrame.lastStreamId() && Http2CodecUtil.isStreamIdValid(streamId, server)) {
339 final AbstractHttp2StreamChannel childChannel = (AbstractHttp2StreamChannel)
340 ((DefaultHttp2FrameStream) stream).attachment;
341 childChannel.pipeline().fireUserEventTriggered(goAwayFrame.retainedDuplicate());
342 }
343 return true;
344 }
345 });
346 } catch (Http2Exception e) {
347 ctx.fireExceptionCaught(e);
348 ctx.close();
349 }
350 }
351
352 /**
353 * Notifies any child streams of the read completion.
354 */
355 @Override
356 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
357 processPendingReadCompleteQueue();
358 ctx.fireChannelReadComplete();
359 }
360
361 private void processPendingReadCompleteQueue() {
362 parentReadInProgress = true;
363 // If we have many child channel we can optimize for the case when multiple call flush() in
364 // channelReadComplete(...) callbacks and only do it once as otherwise we will end-up with multiple
365 // write calls on the socket which is expensive.
366 AbstractHttp2StreamChannel childChannel = readCompletePendingQueue.poll();
367 if (childChannel != null) {
368 try {
369 do {
370 childChannel.fireChildReadComplete();
371 childChannel = readCompletePendingQueue.poll();
372 } while (childChannel != null);
373 } finally {
374 parentReadInProgress = false;
375 readCompletePendingQueue.clear();
376 ctx.flush();
377 }
378 } else {
379 parentReadInProgress = false;
380 }
381 }
382
383 private final class Http2MultiplexHandlerStreamChannel extends AbstractHttp2StreamChannel {
384
385 Http2MultiplexHandlerStreamChannel(DefaultHttp2FrameStream stream, ChannelHandler inboundHandler) {
386 super(stream, ++idCount, inboundHandler);
387 }
388
389 @Override
390 protected boolean isParentReadInProgress() {
391 return parentReadInProgress;
392 }
393
394 @Override
395 protected void addChannelToReadCompletePendingQueue() {
396 // If there is no space left in the queue, just keep on processing everything that is already
397 // stored there and try again.
398 while (!readCompletePendingQueue.offer(this)) {
399 processPendingReadCompleteQueue();
400 }
401 }
402
403 @Override
404 protected ChannelHandlerContext parentContext() {
405 return ctx;
406 }
407 }
408 }