View Javadoc
1   /*
2    * Copyright 2019 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.codec.http2;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.channel.Channel;
20  import io.netty.channel.ChannelConfig;
21  import io.netty.channel.ChannelFuture;
22  import io.netty.channel.ChannelFutureListener;
23  import io.netty.channel.ChannelHandler;
24  import io.netty.channel.ChannelHandlerContext;
25  import io.netty.channel.ChannelPipeline;
26  import io.netty.channel.EventLoop;
27  import io.netty.channel.ServerChannel;
28  import io.netty.handler.codec.http2.Http2FrameCodec.DefaultHttp2FrameStream;
29  import io.netty.util.ReferenceCounted;
30  import io.netty.util.internal.ObjectUtil;
31  import io.netty.util.internal.UnstableApi;
32  
33  import javax.net.ssl.SSLException;
34  import java.util.ArrayDeque;
35  import java.util.Queue;
36  
37  import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
38  import static io.netty.handler.codec.http2.Http2Exception.connectionError;
39  
40  /**
41   * An HTTP/2 handler that creates child channels for each stream. This handler must be used in combination
42   * with {@link Http2FrameCodec}.
43   *
44   * <p>When a new stream is created, a new {@link Channel} is created for it. Applications send and
45   * receive {@link Http2StreamFrame}s on the created channel. {@link ByteBuf}s cannot be processed by the channel;
46   * all writes that reach the head of the pipeline must be an instance of {@link Http2StreamFrame}. Writes that reach
47   * the head of the pipeline are processed directly by this handler and cannot be intercepted.
48   *
49   * <p>The child channel will be notified of user events that impact the stream, such as {@link
50   * Http2GoAwayFrame} and {@link Http2ResetFrame}, as soon as they occur. Although {@code
51   * Http2GoAwayFrame} and {@code Http2ResetFrame} signify that the remote is ignoring further
52   * communication, closing of the channel is delayed until any inbound queue is drained with {@link
53   * Channel#read()}, which follows the default behavior of channels in Netty. Applications are
54   * free to close the channel in response to such events if they don't have use for any queued
55   * messages. Any connection level events like {@link Http2SettingsFrame} and {@link Http2GoAwayFrame}
56   * will be processed internally and also propagated down the pipeline for other handlers to act on.
57   *
58   * <p>Outbound streams are supported via the {@link Http2StreamChannelBootstrap}.
59   *
60   * <p>{@link ChannelConfig#setMaxMessagesPerRead(int)} and {@link ChannelConfig#setAutoRead(boolean)} are supported.
61   *
62   * <h3>Reference Counting</h3>
63   *
64   * Some {@link Http2StreamFrame}s implement the {@link ReferenceCounted} interface, as they carry
65   * reference counted objects (e.g. {@link ByteBuf}s). The multiplex codec will call {@link ReferenceCounted#retain()}
66   * before propagating a reference counted object through the pipeline, and thus an application handler needs to release
67   * such an object after having consumed it. For more information on reference counting take a look at
68   * https://netty.io/wiki/reference-counted-objects.html
69   *
70   * <h3>Channel Events</h3>
71   *
72   * A child channel becomes active as soon as it is registered to an {@link EventLoop}. Therefore, an active channel
73   * does not map to an active HTTP/2 stream immediately. Only once a {@link Http2HeadersFrame} has been successfully sent
74   * or received, does the channel map to an active HTTP/2 stream. In case it is not possible to open a new HTTP/2 stream
75   * (i.e. due to the maximum number of active streams being exceeded), the child channel receives an exception
76   * indicating the cause and is closed immediately thereafter.
77   *
78   * <h3>Writability and Flow Control</h3>
79   *
80   * A child channel observes outbound/remote flow control via the channel's writability. A channel only becomes writable
81   * when it maps to an active HTTP/2 stream . A child channel does not know about the connection-level flow control
82   * window. {@link ChannelHandler}s are free to ignore the channel's writability, in which case the excessive writes will
83   * be buffered by the parent channel. It's important to note that only {@link Http2DataFrame}s are subject to
84   * HTTP/2 flow control.
85   */
86  @UnstableApi
87  public final class Http2MultiplexHandler extends Http2ChannelDuplexHandler {
88  
89      static final ChannelFutureListener CHILD_CHANNEL_REGISTRATION_LISTENER = new ChannelFutureListener() {
90          @Override
91          public void operationComplete(ChannelFuture future) {
92              registerDone(future);
93          }
94      };
95  
96      private final ChannelHandler inboundStreamHandler;
97      private final ChannelHandler upgradeStreamHandler;
98      private final Queue<AbstractHttp2StreamChannel> readCompletePendingQueue =
99              new MaxCapacityQueue<AbstractHttp2StreamChannel>(new ArrayDeque<AbstractHttp2StreamChannel>(8),
100                     // Choose 100 which is what is used most of the times as default.
101                     Http2CodecUtil.SMALLEST_MAX_CONCURRENT_STREAMS);
102 
103     private boolean parentReadInProgress;
104     private int idCount;
105 
106     // Need to be volatile as accessed from within the Http2MultiplexHandlerStreamChannel in a multi-threaded fashion.
107     private volatile ChannelHandlerContext ctx;
108 
109     /**
110      * Creates a new instance
111      *
112      * @param inboundStreamHandler the {@link ChannelHandler} that will be added to the {@link ChannelPipeline} of
113      *                             the {@link Channel}s created for new inbound streams.
114      */
115     public Http2MultiplexHandler(ChannelHandler inboundStreamHandler) {
116         this(inboundStreamHandler, null);
117     }
118 
119     /**
120      * Creates a new instance
121      *
122      * @param inboundStreamHandler the {@link ChannelHandler} that will be added to the {@link ChannelPipeline} of
123      *                             the {@link Channel}s created for new inbound streams.
124      * @param upgradeStreamHandler the {@link ChannelHandler} that will be added to the {@link ChannelPipeline} of the
125      *                             upgraded {@link Channel}.
126      */
127     public Http2MultiplexHandler(ChannelHandler inboundStreamHandler, ChannelHandler upgradeStreamHandler) {
128         this.inboundStreamHandler = ObjectUtil.checkNotNull(inboundStreamHandler, "inboundStreamHandler");
129         this.upgradeStreamHandler = upgradeStreamHandler;
130     }
131 
132     static void registerDone(ChannelFuture future) {
133         // Handle any errors that occurred on the local thread while registering. Even though
134         // failures can happen after this point, they will be handled by the channel by closing the
135         // childChannel.
136         if (!future.isSuccess()) {
137             Channel childChannel = future.channel();
138             if (childChannel.isRegistered()) {
139                 childChannel.close();
140             } else {
141                 childChannel.unsafe().closeForcibly();
142             }
143         }
144     }
145 
146     @Override
147     protected void handlerAdded0(ChannelHandlerContext ctx) {
148         if (ctx.executor() != ctx.channel().eventLoop()) {
149             throw new IllegalStateException("EventExecutor must be EventLoop of Channel");
150         }
151         this.ctx = ctx;
152     }
153 
154     @Override
155     protected void handlerRemoved0(ChannelHandlerContext ctx) {
156         readCompletePendingQueue.clear();
157     }
158 
159     @Override
160     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
161         parentReadInProgress = true;
162         if (msg instanceof Http2StreamFrame) {
163             if (msg instanceof Http2WindowUpdateFrame) {
164                 // We dont want to propagate update frames to the user
165                 return;
166             }
167             Http2StreamFrame streamFrame = (Http2StreamFrame) msg;
168             DefaultHttp2FrameStream s =
169                     (DefaultHttp2FrameStream) streamFrame.stream();
170 
171             AbstractHttp2StreamChannel channel = (AbstractHttp2StreamChannel) s.attachment;
172             if (msg instanceof Http2ResetFrame) {
173                 // Reset frames needs to be propagated via user events as these are not flow-controlled and so
174                 // must not be controlled by suppressing channel.read() on the child channel.
175                 channel.pipeline().fireUserEventTriggered(msg);
176 
177                 // RST frames will also trigger closing of the streams which then will call
178                 // AbstractHttp2StreamChannel.streamClosed()
179             } else {
180                 channel.fireChildRead(streamFrame);
181             }
182             return;
183         }
184 
185         if (msg instanceof Http2GoAwayFrame) {
186             // goaway frames will also trigger closing of the streams which then will call
187             // AbstractHttp2StreamChannel.streamClosed()
188             onHttp2GoAwayFrame(ctx, (Http2GoAwayFrame) msg);
189         }
190 
191         // Send everything down the pipeline
192         ctx.fireChannelRead(msg);
193     }
194 
195     @Override
196     public void channelWritabilityChanged(final ChannelHandlerContext ctx) throws Exception {
197         if (ctx.channel().isWritable()) {
198             // While the writability state may change during iterating of the streams we just set all of the streams
199             // to writable to not affect fairness. These will be "limited" by their own watermarks in any case.
200             forEachActiveStream(AbstractHttp2StreamChannel.WRITABLE_VISITOR);
201         }
202 
203         ctx.fireChannelWritabilityChanged();
204     }
205 
206     @Override
207     public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
208         if (evt instanceof Http2FrameStreamEvent) {
209             Http2FrameStreamEvent event = (Http2FrameStreamEvent) evt;
210             DefaultHttp2FrameStream stream = (DefaultHttp2FrameStream) event.stream();
211             if (event.type() == Http2FrameStreamEvent.Type.State) {
212                 switch (stream.state()) {
213                     case HALF_CLOSED_LOCAL:
214                         if (stream.id() != Http2CodecUtil.HTTP_UPGRADE_STREAM_ID) {
215                             // Ignore everything which was not caused by an upgrade
216                             break;
217                         }
218                         // fall-through
219                     case HALF_CLOSED_REMOTE:
220                         // fall-through
221                     case OPEN:
222                         if (stream.attachment != null) {
223                             // ignore if child channel was already created.
224                             break;
225                         }
226                         final AbstractHttp2StreamChannel ch;
227                         // We need to handle upgrades special when on the client side.
228                         if (stream.id() == Http2CodecUtil.HTTP_UPGRADE_STREAM_ID && !isServer(ctx)) {
229                             // We must have an upgrade handler or else we can't handle the stream
230                             if (upgradeStreamHandler == null) {
231                                 throw connectionError(INTERNAL_ERROR,
232                                         "Client is misconfigured for upgrade requests");
233                             }
234                             ch = new Http2MultiplexHandlerStreamChannel(stream, upgradeStreamHandler);
235                             ch.closeOutbound();
236                         } else {
237                             ch = new Http2MultiplexHandlerStreamChannel(stream, inboundStreamHandler);
238                         }
239                         ChannelFuture future = ctx.channel().eventLoop().register(ch);
240                         if (future.isDone()) {
241                             registerDone(future);
242                         } else {
243                             future.addListener(CHILD_CHANNEL_REGISTRATION_LISTENER);
244                         }
245                         break;
246                     case CLOSED:
247                         AbstractHttp2StreamChannel channel = (AbstractHttp2StreamChannel) stream.attachment;
248                         if (channel != null) {
249                             channel.streamClosed();
250                         }
251                         break;
252                     default:
253                         // ignore for now
254                         break;
255                 }
256             }
257             return;
258         }
259         ctx.fireUserEventTriggered(evt);
260     }
261 
262     // TODO: This is most likely not the best way to expose this, need to think more about it.
263     Http2StreamChannel newOutboundStream() {
264         return new Http2MultiplexHandlerStreamChannel((DefaultHttp2FrameStream) newStream(), null);
265     }
266 
267     @Override
268     public void exceptionCaught(ChannelHandlerContext ctx, final Throwable cause) throws Exception {
269         if (cause instanceof Http2FrameStreamException) {
270             Http2FrameStreamException exception = (Http2FrameStreamException) cause;
271             Http2FrameStream stream = exception.stream();
272             AbstractHttp2StreamChannel childChannel = (AbstractHttp2StreamChannel)
273                     ((DefaultHttp2FrameStream) stream).attachment;
274             try {
275                 childChannel.pipeline().fireExceptionCaught(cause.getCause());
276             } finally {
277                 childChannel.unsafe().closeForcibly();
278             }
279             return;
280         }
281         if (cause.getCause() instanceof SSLException) {
282             forEachActiveStream(new Http2FrameStreamVisitor() {
283                 @Override
284                 public boolean visit(Http2FrameStream stream) {
285                     AbstractHttp2StreamChannel childChannel = (AbstractHttp2StreamChannel)
286                             ((DefaultHttp2FrameStream) stream).attachment;
287                     childChannel.pipeline().fireExceptionCaught(cause);
288                     return true;
289                 }
290             });
291         }
292         ctx.fireExceptionCaught(cause);
293     }
294 
295     private static boolean isServer(ChannelHandlerContext ctx) {
296         return ctx.channel().parent() instanceof ServerChannel;
297     }
298 
299     private void onHttp2GoAwayFrame(ChannelHandlerContext ctx, final Http2GoAwayFrame goAwayFrame) {
300         if (goAwayFrame.lastStreamId() == Integer.MAX_VALUE) {
301             // None of the streams can have an id greater than Integer.MAX_VALUE
302             return;
303         }
304         // Notify which streams were not processed by the remote peer and are safe to retry on another connection:
305         try {
306             final boolean server = isServer(ctx);
307             forEachActiveStream(new Http2FrameStreamVisitor() {
308                 @Override
309                 public boolean visit(Http2FrameStream stream) {
310                     final int streamId = stream.id();
311                     if (streamId > goAwayFrame.lastStreamId() && Http2CodecUtil.isStreamIdValid(streamId, server)) {
312                         final AbstractHttp2StreamChannel childChannel = (AbstractHttp2StreamChannel)
313                                 ((DefaultHttp2FrameStream) stream).attachment;
314                         childChannel.pipeline().fireUserEventTriggered(goAwayFrame.retainedDuplicate());
315                     }
316                     return true;
317                 }
318             });
319         } catch (Http2Exception e) {
320             ctx.fireExceptionCaught(e);
321             ctx.close();
322         }
323     }
324 
325     /**
326      * Notifies any child streams of the read completion.
327      */
328     @Override
329     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
330         processPendingReadCompleteQueue();
331         ctx.fireChannelReadComplete();
332     }
333 
334     private void processPendingReadCompleteQueue() {
335         parentReadInProgress = true;
336         // If we have many child channel we can optimize for the case when multiple call flush() in
337         // channelReadComplete(...) callbacks and only do it once as otherwise we will end-up with multiple
338         // write calls on the socket which is expensive.
339         AbstractHttp2StreamChannel childChannel = readCompletePendingQueue.poll();
340         if (childChannel != null) {
341             try {
342                 do {
343                     childChannel.fireChildReadComplete();
344                     childChannel = readCompletePendingQueue.poll();
345                 } while (childChannel != null);
346             } finally {
347                 parentReadInProgress = false;
348                 readCompletePendingQueue.clear();
349                 ctx.flush();
350             }
351         } else {
352             parentReadInProgress = false;
353         }
354     }
355 
356     private final class Http2MultiplexHandlerStreamChannel extends AbstractHttp2StreamChannel {
357 
358         Http2MultiplexHandlerStreamChannel(DefaultHttp2FrameStream stream, ChannelHandler inboundHandler) {
359             super(stream, ++idCount, inboundHandler);
360         }
361 
362         @Override
363         protected boolean isParentReadInProgress() {
364             return parentReadInProgress;
365         }
366 
367         @Override
368         protected void addChannelToReadCompletePendingQueue() {
369             // If there is no space left in the queue, just keep on processing everything that is already
370             // stored there and try again.
371             while (!readCompletePendingQueue.offer(this)) {
372                 processPendingReadCompleteQueue();
373             }
374         }
375 
376         @Override
377         protected ChannelHandlerContext parentContext() {
378             return ctx;
379         }
380     }
381 }