View Javadoc
1   /*
2    * Copyright 2019 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.codec.http2;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.channel.Channel;
20  import io.netty.channel.ChannelConfig;
21  import io.netty.channel.ChannelFuture;
22  import io.netty.channel.ChannelFutureListener;
23  import io.netty.channel.ChannelHandler;
24  import io.netty.channel.ChannelHandlerContext;
25  import io.netty.channel.ChannelPipeline;
26  import io.netty.channel.EventLoop;
27  import io.netty.channel.ServerChannel;
28  import io.netty.channel.socket.ChannelInputShutdownReadComplete;
29  import io.netty.channel.socket.ChannelOutputShutdownEvent;
30  import io.netty.handler.codec.http2.Http2FrameCodec.DefaultHttp2FrameStream;
31  import io.netty.handler.ssl.SslCloseCompletionEvent;
32  import io.netty.util.ReferenceCounted;
33  import io.netty.util.internal.ObjectUtil;
34  
35  import java.util.ArrayDeque;
36  import java.util.Queue;
37  import javax.net.ssl.SSLException;
38  
39  import static io.netty.handler.codec.http2.AbstractHttp2StreamChannel.CHANNEL_INPUT_SHUTDOWN_READ_COMPLETE_VISITOR;
40  import static io.netty.handler.codec.http2.AbstractHttp2StreamChannel.CHANNEL_OUTPUT_SHUTDOWN_EVENT_VISITOR;
41  import static io.netty.handler.codec.http2.AbstractHttp2StreamChannel.SSL_CLOSE_COMPLETION_EVENT_VISITOR;
42  import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
43  import static io.netty.handler.codec.http2.Http2Exception.connectionError;
44  
45  /**
46   * An HTTP/2 handler that creates child channels for each stream. This handler must be used in combination
47   * with {@link Http2FrameCodec}.
48   *
49   * <p>When a new stream is created, a new {@link Http2StreamChannel} is created for it. Applications send and
50   * receive {@link Http2StreamFrame}s on the created channel. {@link ByteBuf}s cannot be processed by the channel;
51   * all writes that reach the head of the pipeline must be an instance of {@link Http2StreamFrame}. Writes that reach
52   * the head of the pipeline are processed directly by this handler and cannot be intercepted.
53   *
54   * <p>The child channel will be notified of user events that impact the stream, such as {@link
55   * Http2GoAwayFrame} and {@link Http2ResetFrame}, as soon as they occur. Although {@code
56   * Http2GoAwayFrame} and {@code Http2ResetFrame} signify that the remote is ignoring further
57   * communication, closing of the channel is delayed until any inbound queue is drained with {@link
58   * Channel#read()}, which follows the default behavior of channels in Netty. Applications are
59   * free to close the channel in response to such events if they don't have use for any queued
60   * messages. Any connection level events like {@link Http2SettingsFrame} and {@link Http2GoAwayFrame}
61   * will be processed internally and also propagated down the pipeline for other handlers to act on.
62   *
63   * <p>Outbound streams are supported via the {@link Http2StreamChannelBootstrap}.
64   *
65   * <p>{@link ChannelConfig#setMaxMessagesPerRead(int)} and {@link ChannelConfig#setAutoRead(boolean)} are supported.
66   *
67   * <h3>Reference Counting</h3>
68   *
69   * Some {@link Http2StreamFrame}s implement the {@link ReferenceCounted} interface, as they carry
70   * reference counted objects (e.g. {@link ByteBuf}s). The multiplex codec will call {@link ReferenceCounted#retain()}
71   * before propagating a reference counted object through the pipeline, and thus an application handler needs to release
72   * such an object after having consumed it. For more information on reference counting take a look at
73   * <a href="https://netty.io/wiki/reference-counted-objects.html">the reference counted docs.</a>
74   *
75   * <h3>Channel Events</h3>
76   *
77   * A child channel becomes active as soon as it is registered to an {@link EventLoop}. Therefore, an active channel
78   * does not map to an active HTTP/2 stream immediately. Only once a {@link Http2HeadersFrame} has been successfully sent
79   * or received, does the channel map to an active HTTP/2 stream. In case it is not possible to open a new HTTP/2 stream
80   * (i.e. due to the maximum number of active streams being exceeded), the child channel receives an exception
81   * indicating the cause and is closed immediately thereafter.
82   *
83   * <h3>Writability and Flow Control</h3>
84   *
85   * A child channel observes outbound/remote flow control via the channel's writability. A channel only becomes writable
86   * when it maps to an active HTTP/2 stream . A child channel does not know about the connection-level flow control
87   * window. {@link ChannelHandler}s are free to ignore the channel's writability, in which case the excessive writes will
88   * be buffered by the parent channel. It's important to note that only {@link Http2DataFrame}s are subject to
89   * HTTP/2 flow control.
90   *
91   * <h3>Closing a {@link Http2StreamChannel}</h3>
92   *
93   * Once you close a {@link Http2StreamChannel} a {@link Http2ResetFrame} will be sent to the remote peer with
94   * {@link Http2Error#CANCEL} if needed. If you want to close the stream with another {@link Http2Error} (due
95   * errors / limits) you should propagate a {@link Http2FrameStreamException} through the {@link ChannelPipeline}.
96   * Once it reaches the end of the {@link ChannelPipeline} it will automatically close the {@link Http2StreamChannel}
97   * and send a {@link Http2ResetFrame} with the unwrapped {@link Http2Error} set. Another possibility is to just
98   * directly write a {@link Http2ResetFrame} to the {@link Http2StreamChannel}l.
99   */
100 public final class Http2MultiplexHandler extends Http2ChannelDuplexHandler {
101 
102     static final ChannelFutureListener CHILD_CHANNEL_REGISTRATION_LISTENER = new ChannelFutureListener() {
103         @Override
104         public void operationComplete(ChannelFuture future) {
105             registerDone(future);
106         }
107     };
108 
109     private final ChannelHandler inboundStreamHandler;
110     private final ChannelHandler upgradeStreamHandler;
111     private final Queue<AbstractHttp2StreamChannel> readCompletePendingQueue =
112             new MaxCapacityQueue<AbstractHttp2StreamChannel>(new ArrayDeque<AbstractHttp2StreamChannel>(8),
113                     // Choose 100 which is what is used most of the times as default.
114                     Http2CodecUtil.SMALLEST_MAX_CONCURRENT_STREAMS);
115 
116     private boolean parentReadInProgress;
117     private int idCount;
118 
119     // Need to be volatile as accessed from within the Http2MultiplexHandlerStreamChannel in a multi-threaded fashion.
120     private volatile ChannelHandlerContext ctx;
121 
122     /**
123      * Creates a new instance
124      *
125      * @param inboundStreamHandler the {@link ChannelHandler} that will be added to the {@link ChannelPipeline} of
126      *                             the {@link Channel}s created for new inbound streams.
127      */
128     public Http2MultiplexHandler(ChannelHandler inboundStreamHandler) {
129         this(inboundStreamHandler, null);
130     }
131 
132     /**
133      * Creates a new instance
134      *
135      * @param inboundStreamHandler the {@link ChannelHandler} that will be added to the {@link ChannelPipeline} of
136      *                             the {@link Channel}s created for new inbound streams.
137      * @param upgradeStreamHandler the {@link ChannelHandler} that will be added to the {@link ChannelPipeline} of the
138      *                             upgraded {@link Channel}.
139      */
140     public Http2MultiplexHandler(ChannelHandler inboundStreamHandler, ChannelHandler upgradeStreamHandler) {
141         this.inboundStreamHandler = ObjectUtil.checkNotNull(inboundStreamHandler, "inboundStreamHandler");
142         this.upgradeStreamHandler = upgradeStreamHandler;
143     }
144 
145     static void registerDone(ChannelFuture future) {
146         // Handle any errors that occurred on the local thread while registering. Even though
147         // failures can happen after this point, they will be handled by the channel by closing the
148         // childChannel.
149         if (!future.isSuccess()) {
150             Channel childChannel = future.channel();
151             if (childChannel.isRegistered()) {
152                 childChannel.close();
153             } else {
154                 childChannel.unsafe().closeForcibly();
155             }
156         }
157     }
158 
159     @Override
160     protected void handlerAdded0(ChannelHandlerContext ctx) {
161         if (ctx.executor() != ctx.channel().eventLoop()) {
162             throw new IllegalStateException("EventExecutor must be EventLoop of Channel");
163         }
164         this.ctx = ctx;
165     }
166 
167     @Override
168     protected void handlerRemoved0(ChannelHandlerContext ctx) {
169         readCompletePendingQueue.clear();
170     }
171 
172     @Override
173     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
174         parentReadInProgress = true;
175         if (msg instanceof Http2StreamFrame) {
176             if (msg instanceof Http2WindowUpdateFrame) {
177                 // We dont want to propagate update frames to the user
178                 return;
179             }
180             Http2StreamFrame streamFrame = (Http2StreamFrame) msg;
181             DefaultHttp2FrameStream s =
182                     (DefaultHttp2FrameStream) streamFrame.stream();
183 
184             AbstractHttp2StreamChannel channel = (AbstractHttp2StreamChannel) s.attachment;
185             if (msg instanceof Http2ResetFrame || msg instanceof Http2PriorityFrame) {
186                 // Reset and Priority frames needs to be propagated via user events as these are not flow-controlled and
187                 // so must not be controlled by suppressing channel.read() on the child channel.
188                 channel.pipeline().fireUserEventTriggered(msg);
189 
190                 // RST frames will also trigger closing of the streams which then will call
191                 // AbstractHttp2StreamChannel.streamClosed()
192             } else {
193                 channel.fireChildRead(streamFrame);
194             }
195             return;
196         }
197 
198         if (msg instanceof Http2GoAwayFrame) {
199             // goaway frames will also trigger closing of the streams which then will call
200             // AbstractHttp2StreamChannel.streamClosed()
201             onHttp2GoAwayFrame(ctx, (Http2GoAwayFrame) msg);
202         }
203 
204         // Send everything down the pipeline
205         ctx.fireChannelRead(msg);
206     }
207 
208     @Override
209     public void channelWritabilityChanged(final ChannelHandlerContext ctx) throws Exception {
210         if (ctx.channel().isWritable()) {
211             // While the writability state may change during iterating of the streams we just set all of the streams
212             // to writable to not affect fairness. These will be "limited" by their own watermarks in any case.
213             forEachActiveStream(AbstractHttp2StreamChannel.WRITABLE_VISITOR);
214         }
215 
216         ctx.fireChannelWritabilityChanged();
217     }
218 
219     @Override
220     public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
221         if (evt instanceof Http2FrameStreamEvent) {
222             Http2FrameStreamEvent event = (Http2FrameStreamEvent) evt;
223             DefaultHttp2FrameStream stream = (DefaultHttp2FrameStream) event.stream();
224             if (event.type() == Http2FrameStreamEvent.Type.State) {
225                 switch (stream.state()) {
226                     case HALF_CLOSED_LOCAL:
227                         if (stream.id() != Http2CodecUtil.HTTP_UPGRADE_STREAM_ID) {
228                             // Ignore everything which was not caused by an upgrade
229                             break;
230                         }
231                         // fall-through
232                     case HALF_CLOSED_REMOTE:
233                         // fall-through
234                     case OPEN:
235                         if (stream.attachment != null) {
236                             // ignore if child channel was already created.
237                             break;
238                         }
239                         final AbstractHttp2StreamChannel ch;
240                         // We need to handle upgrades special when on the client side.
241                         if (stream.id() == Http2CodecUtil.HTTP_UPGRADE_STREAM_ID && !isServer(ctx)) {
242                             // We must have an upgrade handler or else we can't handle the stream
243                             if (upgradeStreamHandler == null) {
244                                 throw connectionError(INTERNAL_ERROR,
245                                         "Client is misconfigured for upgrade requests");
246                             }
247                             ch = new Http2MultiplexHandlerStreamChannel(stream, upgradeStreamHandler);
248                             ch.closeOutbound();
249                         } else {
250                             ch = new Http2MultiplexHandlerStreamChannel(stream, inboundStreamHandler);
251                         }
252                         ChannelFuture future = ctx.channel().eventLoop().register(ch);
253                         if (future.isDone()) {
254                             registerDone(future);
255                         } else {
256                             future.addListener(CHILD_CHANNEL_REGISTRATION_LISTENER);
257                         }
258                         break;
259                     case CLOSED:
260                         AbstractHttp2StreamChannel channel = (AbstractHttp2StreamChannel) stream.attachment;
261                         if (channel != null) {
262                             channel.streamClosed();
263                         }
264                         break;
265                     default:
266                         // ignore for now
267                         break;
268                 }
269             }
270             return;
271         }
272         if (evt == ChannelInputShutdownReadComplete.INSTANCE) {
273             forEachActiveStream(CHANNEL_INPUT_SHUTDOWN_READ_COMPLETE_VISITOR);
274         } else if (evt == ChannelOutputShutdownEvent.INSTANCE) {
275             forEachActiveStream(CHANNEL_OUTPUT_SHUTDOWN_EVENT_VISITOR);
276         } else if (evt == SslCloseCompletionEvent.SUCCESS) {
277             forEachActiveStream(SSL_CLOSE_COMPLETION_EVENT_VISITOR);
278         }
279         ctx.fireUserEventTriggered(evt);
280     }
281 
282     // TODO: This is most likely not the best way to expose this, need to think more about it.
283     Http2StreamChannel newOutboundStream() {
284         return new Http2MultiplexHandlerStreamChannel((DefaultHttp2FrameStream) newStream(), null);
285     }
286 
287     @Override
288     public void exceptionCaught(ChannelHandlerContext ctx, final Throwable cause) throws Exception {
289         if (cause instanceof Http2FrameStreamException) {
290             Http2FrameStreamException exception = (Http2FrameStreamException) cause;
291             Http2FrameStream stream = exception.stream();
292             AbstractHttp2StreamChannel childChannel = (AbstractHttp2StreamChannel)
293                     ((DefaultHttp2FrameStream) stream).attachment;
294             try {
295                 childChannel.pipeline().fireExceptionCaught(cause.getCause());
296             } finally {
297                 // Close with the correct error that causes this stream exception.
298                 // See https://github.com/netty/netty/issues/13235#issuecomment-1441994672
299                 childChannel.closeWithError(exception.error());
300             }
301             return;
302         }
303         if (cause instanceof Http2MultiplexActiveStreamsException) {
304             // Unwrap the cause that was used to create it and fire it for all the active streams.
305             fireExceptionCaughtForActiveStream(cause.getCause());
306             return;
307         }
308 
309         if (cause.getCause() instanceof SSLException) {
310             fireExceptionCaughtForActiveStream(cause);
311         }
312         ctx.fireExceptionCaught(cause);
313     }
314 
315     private void fireExceptionCaughtForActiveStream(final Throwable cause) throws Http2Exception {
316         forEachActiveStream(new Http2FrameStreamVisitor() {
317             @Override
318             public boolean visit(Http2FrameStream stream) {
319                 AbstractHttp2StreamChannel childChannel = (AbstractHttp2StreamChannel)
320                         ((DefaultHttp2FrameStream) stream).attachment;
321                 childChannel.pipeline().fireExceptionCaught(cause);
322                 return true;
323             }
324         });
325     }
326 
327     private static boolean isServer(ChannelHandlerContext ctx) {
328         return ctx.channel().parent() instanceof ServerChannel;
329     }
330 
331     private void onHttp2GoAwayFrame(ChannelHandlerContext ctx, final Http2GoAwayFrame goAwayFrame) {
332         if (goAwayFrame.lastStreamId() == Integer.MAX_VALUE) {
333             // None of the streams can have an id greater than Integer.MAX_VALUE
334             return;
335         }
336         // Notify which streams were not processed by the remote peer and are safe to retry on another connection:
337         try {
338             final boolean server = isServer(ctx);
339             forEachActiveStream(new Http2FrameStreamVisitor() {
340                 @Override
341                 public boolean visit(Http2FrameStream stream) {
342                     final int streamId = stream.id();
343                     if (streamId > goAwayFrame.lastStreamId() && Http2CodecUtil.isStreamIdValid(streamId, server)) {
344                         final AbstractHttp2StreamChannel childChannel = (AbstractHttp2StreamChannel)
345                                 ((DefaultHttp2FrameStream) stream).attachment;
346                         childChannel.pipeline().fireUserEventTriggered(goAwayFrame.retainedDuplicate());
347                     }
348                     return true;
349                 }
350             });
351         } catch (Http2Exception e) {
352             ctx.fireExceptionCaught(e);
353             ctx.close();
354         }
355     }
356 
357     /**
358      * Notifies any child streams of the read completion.
359      */
360     @Override
361     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
362         processPendingReadCompleteQueue();
363         ctx.fireChannelReadComplete();
364     }
365 
366     private void processPendingReadCompleteQueue() {
367         parentReadInProgress = true;
368         // If we have many child channel we can optimize for the case when multiple call flush() in
369         // channelReadComplete(...) callbacks and only do it once as otherwise we will end-up with multiple
370         // write calls on the socket which is expensive.
371         AbstractHttp2StreamChannel childChannel = readCompletePendingQueue.poll();
372         if (childChannel != null) {
373             try {
374                 do {
375                     childChannel.fireChildReadComplete();
376                     childChannel = readCompletePendingQueue.poll();
377                 } while (childChannel != null);
378             } finally {
379                 parentReadInProgress = false;
380                 readCompletePendingQueue.clear();
381                 ctx.flush();
382             }
383         } else {
384             parentReadInProgress = false;
385         }
386     }
387 
388     private final class Http2MultiplexHandlerStreamChannel extends AbstractHttp2StreamChannel {
389 
390         Http2MultiplexHandlerStreamChannel(DefaultHttp2FrameStream stream, ChannelHandler inboundHandler) {
391             super(stream, ++idCount, inboundHandler);
392         }
393 
394         @Override
395         protected boolean isParentReadInProgress() {
396             return parentReadInProgress;
397         }
398 
399         @Override
400         protected void addChannelToReadCompletePendingQueue() {
401             // If there is no space left in the queue, just keep on processing everything that is already
402             // stored there and try again.
403             while (!readCompletePendingQueue.offer(this)) {
404                 processPendingReadCompleteQueue();
405             }
406         }
407 
408         @Override
409         protected ChannelHandlerContext parentContext() {
410             return ctx;
411         }
412     }
413 }