View Javadoc
1   /*
2    * Copyright 2019 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.codec.http2;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.channel.Channel;
20  import io.netty.channel.ChannelConfig;
21  import io.netty.channel.ChannelFuture;
22  import io.netty.channel.ChannelFutureListener;
23  import io.netty.channel.ChannelHandler;
24  import io.netty.channel.ChannelHandlerContext;
25  import io.netty.channel.ChannelPipeline;
26  import io.netty.channel.EventLoop;
27  import io.netty.channel.ServerChannel;
28  import io.netty.channel.socket.ChannelInputShutdownReadComplete;
29  import io.netty.channel.socket.ChannelOutputShutdownEvent;
30  import io.netty.handler.codec.http2.Http2FrameCodec.DefaultHttp2FrameStream;
31  import io.netty.handler.ssl.SslCloseCompletionEvent;
32  import io.netty.util.ReferenceCounted;
33  import io.netty.util.internal.ObjectUtil;
34  import io.netty.util.internal.UnstableApi;
35  
36  import java.util.ArrayDeque;
37  import java.util.Queue;
38  import javax.net.ssl.SSLException;
39  
40  import static io.netty.handler.codec.http2.AbstractHttp2StreamChannel.CHANNEL_INPUT_SHUTDOWN_READ_COMPLETE_VISITOR;
41  import static io.netty.handler.codec.http2.AbstractHttp2StreamChannel.CHANNEL_OUTPUT_SHUTDOWN_EVENT_VISITOR;
42  import static io.netty.handler.codec.http2.AbstractHttp2StreamChannel.SSL_CLOSE_COMPLETION_EVENT_VISITOR;
43  import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
44  import static io.netty.handler.codec.http2.Http2Exception.connectionError;
45  
46  /**
47   * An HTTP/2 handler that creates child channels for each stream. This handler must be used in combination
48   * with {@link Http2FrameCodec}.
49   *
50   * <p>When a new stream is created, a new {@link Http2StreamChannel} is created for it. Applications send and
51   * receive {@link Http2StreamFrame}s on the created channel. {@link ByteBuf}s cannot be processed by the channel;
52   * all writes that reach the head of the pipeline must be an instance of {@link Http2StreamFrame}. Writes that reach
53   * the head of the pipeline are processed directly by this handler and cannot be intercepted.
54   *
55   * <p>The child channel will be notified of user events that impact the stream, such as {@link
56   * Http2GoAwayFrame} and {@link Http2ResetFrame}, as soon as they occur. Although {@code
57   * Http2GoAwayFrame} and {@code Http2ResetFrame} signify that the remote is ignoring further
58   * communication, closing of the channel is delayed until any inbound queue is drained with {@link
59   * Channel#read()}, which follows the default behavior of channels in Netty. Applications are
60   * free to close the channel in response to such events if they don't have use for any queued
61   * messages. Any connection level events like {@link Http2SettingsFrame} and {@link Http2GoAwayFrame}
62   * will be processed internally and also propagated down the pipeline for other handlers to act on.
63   *
64   * <p>Outbound streams are supported via the {@link Http2StreamChannelBootstrap}.
65   *
66   * <p>{@link ChannelConfig#setMaxMessagesPerRead(int)} and {@link ChannelConfig#setAutoRead(boolean)} are supported.
67   *
68   * <h3>Reference Counting</h3>
69   *
70   * Some {@link Http2StreamFrame}s implement the {@link ReferenceCounted} interface, as they carry
71   * reference counted objects (e.g. {@link ByteBuf}s). The multiplex codec will call {@link ReferenceCounted#retain()}
72   * before propagating a reference counted object through the pipeline, and thus an application handler needs to release
73   * such an object after having consumed it. For more information on reference counting take a look at
74   * <a href="https://netty.io/wiki/reference-counted-objects.html">the reference counted docs.</a>
75   *
76   * <h3>Channel Events</h3>
77   *
78   * A child channel becomes active as soon as it is registered to an {@link EventLoop}. Therefore, an active channel
79   * does not map to an active HTTP/2 stream immediately. Only once a {@link Http2HeadersFrame} has been successfully sent
80   * or received, does the channel map to an active HTTP/2 stream. In case it is not possible to open a new HTTP/2 stream
81   * (i.e. due to the maximum number of active streams being exceeded), the child channel receives an exception
82   * indicating the cause and is closed immediately thereafter.
83   *
84   * <h3>Writability and Flow Control</h3>
85   *
86   * A child channel observes outbound/remote flow control via the channel's writability. A channel only becomes writable
87   * when it maps to an active HTTP/2 stream . A child channel does not know about the connection-level flow control
88   * window. {@link ChannelHandler}s are free to ignore the channel's writability, in which case the excessive writes will
89   * be buffered by the parent channel. It's important to note that only {@link Http2DataFrame}s are subject to
90   * HTTP/2 flow control.
91   *
92   * <h3>Closing a {@link Http2StreamChannel}</h3>
93   *
94   * Once you close a {@link Http2StreamChannel} a {@link Http2ResetFrame} will be sent to the remote peer with
95   * {@link Http2Error#CANCEL} if needed. If you want to close the stream with another {@link Http2Error} (due
96   * errors / limits) you should propagate a {@link Http2FrameStreamException} through the {@link ChannelPipeline}.
97   * Once it reaches the end of the {@link ChannelPipeline} it will automatically close the {@link Http2StreamChannel}
98   * and send a {@link Http2ResetFrame} with the unwrapped {@link Http2Error} set. Another possibility is to just
99   * directly write a {@link Http2ResetFrame} to the {@link Http2StreamChannel}l.
100  */
101 @UnstableApi
102 public final class Http2MultiplexHandler extends Http2ChannelDuplexHandler {
103 
104     static final ChannelFutureListener CHILD_CHANNEL_REGISTRATION_LISTENER = new ChannelFutureListener() {
105         @Override
106         public void operationComplete(ChannelFuture future) {
107             registerDone(future);
108         }
109     };
110 
111     private final ChannelHandler inboundStreamHandler;
112     private final ChannelHandler upgradeStreamHandler;
113     private final Queue<AbstractHttp2StreamChannel> readCompletePendingQueue =
114             new MaxCapacityQueue<AbstractHttp2StreamChannel>(new ArrayDeque<AbstractHttp2StreamChannel>(8),
115                     // Choose 100 which is what is used most of the times as default.
116                     Http2CodecUtil.SMALLEST_MAX_CONCURRENT_STREAMS);
117 
118     private boolean parentReadInProgress;
119     private int idCount;
120 
121     // Need to be volatile as accessed from within the Http2MultiplexHandlerStreamChannel in a multi-threaded fashion.
122     private volatile ChannelHandlerContext ctx;
123 
124     /**
125      * Creates a new instance
126      *
127      * @param inboundStreamHandler the {@link ChannelHandler} that will be added to the {@link ChannelPipeline} of
128      *                             the {@link Channel}s created for new inbound streams.
129      */
130     public Http2MultiplexHandler(ChannelHandler inboundStreamHandler) {
131         this(inboundStreamHandler, null);
132     }
133 
134     /**
135      * Creates a new instance
136      *
137      * @param inboundStreamHandler the {@link ChannelHandler} that will be added to the {@link ChannelPipeline} of
138      *                             the {@link Channel}s created for new inbound streams.
139      * @param upgradeStreamHandler the {@link ChannelHandler} that will be added to the {@link ChannelPipeline} of the
140      *                             upgraded {@link Channel}.
141      */
142     public Http2MultiplexHandler(ChannelHandler inboundStreamHandler, ChannelHandler upgradeStreamHandler) {
143         this.inboundStreamHandler = ObjectUtil.checkNotNull(inboundStreamHandler, "inboundStreamHandler");
144         this.upgradeStreamHandler = upgradeStreamHandler;
145     }
146 
147     static void registerDone(ChannelFuture future) {
148         // Handle any errors that occurred on the local thread while registering. Even though
149         // failures can happen after this point, they will be handled by the channel by closing the
150         // childChannel.
151         if (!future.isSuccess()) {
152             Channel childChannel = future.channel();
153             if (childChannel.isRegistered()) {
154                 childChannel.close();
155             } else {
156                 childChannel.unsafe().closeForcibly();
157             }
158         }
159     }
160 
161     @Override
162     protected void handlerAdded0(ChannelHandlerContext ctx) {
163         if (ctx.executor() != ctx.channel().eventLoop()) {
164             throw new IllegalStateException("EventExecutor must be EventLoop of Channel");
165         }
166         this.ctx = ctx;
167     }
168 
169     @Override
170     protected void handlerRemoved0(ChannelHandlerContext ctx) {
171         readCompletePendingQueue.clear();
172     }
173 
174     @Override
175     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
176         parentReadInProgress = true;
177         if (msg instanceof Http2StreamFrame) {
178             if (msg instanceof Http2WindowUpdateFrame) {
179                 // We dont want to propagate update frames to the user
180                 return;
181             }
182             Http2StreamFrame streamFrame = (Http2StreamFrame) msg;
183             DefaultHttp2FrameStream s =
184                     (DefaultHttp2FrameStream) streamFrame.stream();
185 
186             AbstractHttp2StreamChannel channel = (AbstractHttp2StreamChannel) s.attachment;
187             if (msg instanceof Http2ResetFrame) {
188                 // Reset frames needs to be propagated via user events as these are not flow-controlled and so
189                 // must not be controlled by suppressing channel.read() on the child channel.
190                 channel.pipeline().fireUserEventTriggered(msg);
191 
192                 // RST frames will also trigger closing of the streams which then will call
193                 // AbstractHttp2StreamChannel.streamClosed()
194             } else {
195                 channel.fireChildRead(streamFrame);
196             }
197             return;
198         }
199 
200         if (msg instanceof Http2GoAwayFrame) {
201             // goaway frames will also trigger closing of the streams which then will call
202             // AbstractHttp2StreamChannel.streamClosed()
203             onHttp2GoAwayFrame(ctx, (Http2GoAwayFrame) msg);
204         }
205 
206         // Send everything down the pipeline
207         ctx.fireChannelRead(msg);
208     }
209 
210     @Override
211     public void channelWritabilityChanged(final ChannelHandlerContext ctx) throws Exception {
212         if (ctx.channel().isWritable()) {
213             // While the writability state may change during iterating of the streams we just set all of the streams
214             // to writable to not affect fairness. These will be "limited" by their own watermarks in any case.
215             forEachActiveStream(AbstractHttp2StreamChannel.WRITABLE_VISITOR);
216         }
217 
218         ctx.fireChannelWritabilityChanged();
219     }
220 
221     @Override
222     public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
223         if (evt instanceof Http2FrameStreamEvent) {
224             Http2FrameStreamEvent event = (Http2FrameStreamEvent) evt;
225             DefaultHttp2FrameStream stream = (DefaultHttp2FrameStream) event.stream();
226             if (event.type() == Http2FrameStreamEvent.Type.State) {
227                 switch (stream.state()) {
228                     case HALF_CLOSED_LOCAL:
229                         if (stream.id() != Http2CodecUtil.HTTP_UPGRADE_STREAM_ID) {
230                             // Ignore everything which was not caused by an upgrade
231                             break;
232                         }
233                         // fall-through
234                     case HALF_CLOSED_REMOTE:
235                         // fall-through
236                     case OPEN:
237                         if (stream.attachment != null) {
238                             // ignore if child channel was already created.
239                             break;
240                         }
241                         final AbstractHttp2StreamChannel ch;
242                         // We need to handle upgrades special when on the client side.
243                         if (stream.id() == Http2CodecUtil.HTTP_UPGRADE_STREAM_ID && !isServer(ctx)) {
244                             // We must have an upgrade handler or else we can't handle the stream
245                             if (upgradeStreamHandler == null) {
246                                 throw connectionError(INTERNAL_ERROR,
247                                         "Client is misconfigured for upgrade requests");
248                             }
249                             ch = new Http2MultiplexHandlerStreamChannel(stream, upgradeStreamHandler);
250                             ch.closeOutbound();
251                         } else {
252                             ch = new Http2MultiplexHandlerStreamChannel(stream, inboundStreamHandler);
253                         }
254                         ChannelFuture future = ctx.channel().eventLoop().register(ch);
255                         if (future.isDone()) {
256                             registerDone(future);
257                         } else {
258                             future.addListener(CHILD_CHANNEL_REGISTRATION_LISTENER);
259                         }
260                         break;
261                     case CLOSED:
262                         AbstractHttp2StreamChannel channel = (AbstractHttp2StreamChannel) stream.attachment;
263                         if (channel != null) {
264                             channel.streamClosed();
265                         }
266                         break;
267                     default:
268                         // ignore for now
269                         break;
270                 }
271             }
272             return;
273         }
274         if (evt == ChannelInputShutdownReadComplete.INSTANCE) {
275             forEachActiveStream(CHANNEL_INPUT_SHUTDOWN_READ_COMPLETE_VISITOR);
276         } else if (evt == ChannelOutputShutdownEvent.INSTANCE) {
277             forEachActiveStream(CHANNEL_OUTPUT_SHUTDOWN_EVENT_VISITOR);
278         } else if (evt == SslCloseCompletionEvent.SUCCESS) {
279             forEachActiveStream(SSL_CLOSE_COMPLETION_EVENT_VISITOR);
280         }
281         ctx.fireUserEventTriggered(evt);
282     }
283 
284     // TODO: This is most likely not the best way to expose this, need to think more about it.
285     Http2StreamChannel newOutboundStream() {
286         return new Http2MultiplexHandlerStreamChannel((DefaultHttp2FrameStream) newStream(), null);
287     }
288 
289     @Override
290     public void exceptionCaught(ChannelHandlerContext ctx, final Throwable cause) throws Exception {
291         if (cause instanceof Http2FrameStreamException) {
292             Http2FrameStreamException exception = (Http2FrameStreamException) cause;
293             Http2FrameStream stream = exception.stream();
294             AbstractHttp2StreamChannel childChannel = (AbstractHttp2StreamChannel)
295                     ((DefaultHttp2FrameStream) stream).attachment;
296             try {
297                 childChannel.pipeline().fireExceptionCaught(cause.getCause());
298             } finally {
299                 // Close with the correct error that causes this stream exception.
300                 // See https://github.com/netty/netty/issues/13235#issuecomment-1441994672
301                 childChannel.closeWithError(exception.error());
302             }
303             return;
304         }
305         if (cause instanceof Http2MultiplexActiveStreamsException) {
306             // Unwrap the cause that was used to create it and fire it for all the active streams.
307             fireExceptionCaughtForActiveStream(cause.getCause());
308             return;
309         }
310 
311         if (cause.getCause() instanceof SSLException) {
312             fireExceptionCaughtForActiveStream(cause);
313         }
314         ctx.fireExceptionCaught(cause);
315     }
316 
317     private void fireExceptionCaughtForActiveStream(final Throwable cause) throws Http2Exception {
318         forEachActiveStream(new Http2FrameStreamVisitor() {
319             @Override
320             public boolean visit(Http2FrameStream stream) {
321                 AbstractHttp2StreamChannel childChannel = (AbstractHttp2StreamChannel)
322                         ((DefaultHttp2FrameStream) stream).attachment;
323                 childChannel.pipeline().fireExceptionCaught(cause);
324                 return true;
325             }
326         });
327     }
328 
329     private static boolean isServer(ChannelHandlerContext ctx) {
330         return ctx.channel().parent() instanceof ServerChannel;
331     }
332 
333     private void onHttp2GoAwayFrame(ChannelHandlerContext ctx, final Http2GoAwayFrame goAwayFrame) {
334         if (goAwayFrame.lastStreamId() == Integer.MAX_VALUE) {
335             // None of the streams can have an id greater than Integer.MAX_VALUE
336             return;
337         }
338         // Notify which streams were not processed by the remote peer and are safe to retry on another connection:
339         try {
340             final boolean server = isServer(ctx);
341             forEachActiveStream(new Http2FrameStreamVisitor() {
342                 @Override
343                 public boolean visit(Http2FrameStream stream) {
344                     final int streamId = stream.id();
345                     if (streamId > goAwayFrame.lastStreamId() && Http2CodecUtil.isStreamIdValid(streamId, server)) {
346                         final AbstractHttp2StreamChannel childChannel = (AbstractHttp2StreamChannel)
347                                 ((DefaultHttp2FrameStream) stream).attachment;
348                         childChannel.pipeline().fireUserEventTriggered(goAwayFrame.retainedDuplicate());
349                     }
350                     return true;
351                 }
352             });
353         } catch (Http2Exception e) {
354             ctx.fireExceptionCaught(e);
355             ctx.close();
356         }
357     }
358 
359     /**
360      * Notifies any child streams of the read completion.
361      */
362     @Override
363     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
364         processPendingReadCompleteQueue();
365         ctx.fireChannelReadComplete();
366     }
367 
368     private void processPendingReadCompleteQueue() {
369         parentReadInProgress = true;
370         // If we have many child channel we can optimize for the case when multiple call flush() in
371         // channelReadComplete(...) callbacks and only do it once as otherwise we will end-up with multiple
372         // write calls on the socket which is expensive.
373         AbstractHttp2StreamChannel childChannel = readCompletePendingQueue.poll();
374         if (childChannel != null) {
375             try {
376                 do {
377                     childChannel.fireChildReadComplete();
378                     childChannel = readCompletePendingQueue.poll();
379                 } while (childChannel != null);
380             } finally {
381                 parentReadInProgress = false;
382                 readCompletePendingQueue.clear();
383                 ctx.flush();
384             }
385         } else {
386             parentReadInProgress = false;
387         }
388     }
389 
390     private final class Http2MultiplexHandlerStreamChannel extends AbstractHttp2StreamChannel {
391 
392         Http2MultiplexHandlerStreamChannel(DefaultHttp2FrameStream stream, ChannelHandler inboundHandler) {
393             super(stream, ++idCount, inboundHandler);
394         }
395 
396         @Override
397         protected boolean isParentReadInProgress() {
398             return parentReadInProgress;
399         }
400 
401         @Override
402         protected void addChannelToReadCompletePendingQueue() {
403             // If there is no space left in the queue, just keep on processing everything that is already
404             // stored there and try again.
405             while (!readCompletePendingQueue.offer(this)) {
406                 processPendingReadCompleteQueue();
407             }
408         }
409 
410         @Override
411         protected ChannelHandlerContext parentContext() {
412             return ctx;
413         }
414     }
415 }