View Javadoc
1   /*
2    * Copyright 2019 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty5.handler.codec.http2;
17  
18  import io.netty5.buffer.api.Buffer;
19  import io.netty5.channel.Channel;
20  import io.netty5.channel.ChannelHandler;
21  import io.netty5.channel.ChannelHandlerContext;
22  import io.netty5.channel.ChannelOption;
23  import io.netty5.channel.ChannelPipeline;
24  import io.netty5.channel.EventLoop;
25  import io.netty5.channel.ServerChannel;
26  import io.netty5.handler.codec.http2.Http2FrameCodec.DefaultHttp2FrameStream;
27  import io.netty5.util.Resource;
28  import io.netty5.util.concurrent.Future;
29  import io.netty5.util.concurrent.FutureContextListener;
30  import io.netty5.util.internal.UnstableApi;
31  
32  import javax.net.ssl.SSLException;
33  import java.util.ArrayDeque;
34  import java.util.Objects;
35  import java.util.Queue;
36  
37  import static io.netty5.handler.codec.http2.Http2Error.INTERNAL_ERROR;
38  import static io.netty5.handler.codec.http2.Http2Exception.connectionError;
39  
40  /**
41   * An HTTP/2 handler that creates child channels for each stream. This handler must be used in combination
42   * with {@link Http2FrameCodec}.
43   *
44   * <p>When a new stream is created, a new {@link Channel} is created for it. Applications send and
45   * receive {@link Http2StreamFrame}s on the created channel. {@link Buffer}s cannot be processed by the channel;
46   * all writes that reach the head of the pipeline must be an instance of {@link Http2StreamFrame}. Writes that reach
47   * the head of the pipeline are processed directly by this handler and cannot be intercepted.
48   *
49   * <p>The child channel will be notified of user events that impact the stream, such as {@link
50   * Http2GoAwayFrame} and {@link Http2ResetFrame}, as soon as they occur. Although {@code
51   * Http2GoAwayFrame} and {@code Http2ResetFrame} signify that the remote is ignoring further
52   * communication, closing of the channel is delayed until any inbound queue is drained with {@link
53   * Channel#read()}, which follows the default behavior of channels in Netty. Applications are
54   * free to close the channel in response to such events if they don't have use for any queued
55   * messages. Any connection level events like {@link Http2SettingsFrame} and {@link Http2GoAwayFrame}
56   * will be processed internally and also propagated down the pipeline for other handlers to act on.
57   *
58   * <p>Outbound streams are supported via the {@link Http2StreamChannelBootstrap}.
59   *
60   * <p>{@link ChannelOption#MAX_MESSAGES_PER_READ} and {@link ChannelOption#AUTO_READ} are supported.
61   *
62   * <h3>Resources</h3>
63   *
64   * Some {@link Http2StreamFrame}s implement the {@link Resource} interface, as they carry
65   * resource objects (e.g. {@link Buffer}s). An application handler needs to close or dispose of
66   * such objects after having consumed them.
67   *
68   * <h3>Channel Events</h3>
69   *
70   * A child channel becomes active as soon as it is registered to an {@link EventLoop}. Therefore, an active channel
71   * does not map to an active HTTP/2 stream immediately. Only once a {@link Http2HeadersFrame} has been successfully sent
72   * or received, does the channel map to an active HTTP/2 stream. In case it is not possible to open a new HTTP/2 stream
73   * (i.e. due to the maximum number of active streams being exceeded), the child channel receives an exception
74   * indicating the cause and is closed immediately thereafter.
75   *
76   * <h3>Writability and Flow Control</h3>
77   *
78   * A child channel observes outbound/remote flow control via the channel's writability. A channel only becomes writable
79   * when it maps to an active HTTP/2 stream . A child channel does not know about the connection-level flow control
80   * window. {@link ChannelHandler}s are free to ignore the channel's writability, in which case the excessive writes will
81   * be buffered by the parent channel. It's important to note that only {@link Http2DataFrame}s are subject to
82   * HTTP/2 flow control.
83   */
84  @UnstableApi
85  public final class Http2MultiplexHandler extends Http2ChannelDuplexHandler {
86  
87      private static final FutureContextListener<Channel, Void> CHILD_CHANNEL_REGISTRATION_LISTENER =
88              Http2MultiplexHandler::registerDone;
89  
90      private final ChannelHandler inboundStreamHandler;
91      private final ChannelHandler upgradeStreamHandler;
92      private final Queue<DefaultHttp2StreamChannel> readCompletePendingQueue =
93              new MaxCapacityQueue<>(new ArrayDeque<>(8),
94                      // Choose 100 which is what is used most of the times as default.
95                      Http2CodecUtil.SMALLEST_MAX_CONCURRENT_STREAMS);
96  
97      private boolean parentReadInProgress;
98      private int idCount;
99  
100     // Need to be volatile as accessed from within the Http2MultiplexHandlerStreamChannel in a multi-threaded fashion.
101     private volatile ChannelHandlerContext ctx;
102 
103     /**
104      * Creates a new instance
105      *
106      * @param inboundStreamHandler the {@link ChannelHandler} that will be added to the {@link ChannelPipeline} of
107      *                             the {@link Channel}s created for new inbound streams.
108      */
109     public Http2MultiplexHandler(ChannelHandler inboundStreamHandler) {
110         this(inboundStreamHandler, null);
111     }
112 
113     /**
114      * Creates a new instance
115      *
116      * @param inboundStreamHandler the {@link ChannelHandler} that will be added to the {@link ChannelPipeline} of
117      *                             the {@link Channel}s created for new inbound streams.
118      * @param upgradeStreamHandler the {@link ChannelHandler} that will be added to the {@link ChannelPipeline} of the
119      *                             upgraded {@link Channel}.
120      */
121     public Http2MultiplexHandler(ChannelHandler inboundStreamHandler, ChannelHandler upgradeStreamHandler) {
122         this.inboundStreamHandler = Objects.requireNonNull(inboundStreamHandler, "inboundStreamHandler");
123         this.upgradeStreamHandler = upgradeStreamHandler;
124     }
125 
126     private static void registerDone(Channel childChannel, Future<?> future) {
127         // Handle any errors that occurred on the local thread while registering. Even though
128         // failures can happen after this point, they will be handled by the channel by closing the
129         // childChannel.
130         if (future.isFailed()) {
131             if (childChannel.isRegistered()) {
132                 childChannel.close();
133             } else {
134                 ((DefaultHttp2StreamChannel) childChannel).closeForcibly();
135             }
136         }
137     }
138 
139     @Override
140     protected void handlerAdded0(ChannelHandlerContext ctx) {
141         if (ctx.executor().inEventLoop() != ctx.channel().executor().inEventLoop()) {
142             throw new IllegalStateException("EventExecutor must be on the same thread as the EventLoop of the Channel");
143         }
144         this.ctx = ctx;
145     }
146 
147     @Override
148     protected void handlerRemoved0(ChannelHandlerContext ctx) {
149         readCompletePendingQueue.clear();
150     }
151 
152     @Override
153     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
154         parentReadInProgress = true;
155         if (msg instanceof Http2StreamFrame) {
156             if (msg instanceof Http2WindowUpdateFrame) {
157                 // We dont want to propagate update frames to the user
158                 return;
159             }
160             Http2StreamFrame streamFrame = (Http2StreamFrame) msg;
161             DefaultHttp2FrameStream s =
162                     (DefaultHttp2FrameStream) streamFrame.stream();
163 
164             DefaultHttp2StreamChannel channel = (DefaultHttp2StreamChannel) s.attachment;
165             if (msg instanceof Http2ResetFrame) {
166                 // Reset frames needs to be propagated via user events as these are not flow-controlled and so
167                 // must not be controlled by suppressing channel.read() on the child channel.
168                 channel.pipeline().fireChannelInboundEvent(msg);
169 
170                 // RST frames will also trigger closing of the streams which then will call
171                 // AbstractHttp2StreamChannel.streamClosed()
172             } else {
173                 channel.fireChildRead(streamFrame);
174             }
175             return;
176         }
177 
178         if (msg instanceof Http2GoAwayFrame) {
179             // goaway frames will also trigger closing of the streams which then will call
180             // AbstractHttp2StreamChannel.streamClosed()
181             onHttp2GoAwayFrame(ctx, (Http2GoAwayFrame) msg);
182         }
183 
184         // Send everything down the pipeline
185         ctx.fireChannelRead(msg);
186     }
187 
188     @Override
189     public void channelWritabilityChanged(final ChannelHandlerContext ctx) throws Exception {
190         if (ctx.channel().isWritable()) {
191             // While the writability state may change during iterating of the streams we just set all of the streams
192             // to writable to not affect fairness. These will be "limited" by their own watermarks in any case.
193             forEachActiveStream(DefaultHttp2StreamChannel.WRITABLE_VISITOR);
194         }
195 
196         ctx.fireChannelWritabilityChanged();
197     }
198 
199     @Override
200     public void channelInboundEvent(ChannelHandlerContext ctx, Object evt) throws Exception {
201         if (evt instanceof Http2FrameStreamEvent) {
202             Http2FrameStreamEvent event = (Http2FrameStreamEvent) evt;
203             DefaultHttp2FrameStream stream = (DefaultHttp2FrameStream) event.stream();
204             if (event.type() == Http2FrameStreamEvent.Type.State) {
205                 switch (stream.state()) {
206                     case HALF_CLOSED_LOCAL:
207                         // Ignore everything which was not caused by an upgrade
208                         if (stream.id() != Http2CodecUtil.HTTP_UPGRADE_STREAM_ID) {
209                             break;
210                         }
211                         // fall-through
212                     case HALF_CLOSED_REMOTE:
213                         // fall-through
214                     case OPEN:
215                         createStreamChannelIfNeeded(stream);
216                         break;
217                     case CLOSED:
218                         DefaultHttp2StreamChannel channel = (DefaultHttp2StreamChannel) stream.attachment;
219                         if (channel != null) {
220                             channel.streamClosed();
221                         }
222                         break;
223                     default:
224                         // ignore for now
225                         break;
226                 }
227             }
228             return;
229         }
230         ctx.fireChannelInboundEvent(evt);
231     }
232 
233     private void createStreamChannelIfNeeded(DefaultHttp2FrameStream stream)
234             throws Http2Exception {
235         if (stream.attachment != null) {
236             // ignore if child channel was already created.
237             return;
238         }
239         final DefaultHttp2StreamChannel ch;
240         // We need to handle upgrades special when on the client side.
241         if (stream.id() == Http2CodecUtil.HTTP_UPGRADE_STREAM_ID && !isServer(ctx)) {
242             // We must have an upgrade handler or else we can't handle the stream
243             if (upgradeStreamHandler == null) {
244                 throw connectionError(INTERNAL_ERROR,
245                         "Client is misconfigured for upgrade requests");
246             }
247             ch = new DefaultHttp2StreamChannel(this, stream, ++idCount, upgradeStreamHandler);
248             ch.closeOutbound();
249         } else {
250             ch = new DefaultHttp2StreamChannel(this, stream, ++idCount, inboundStreamHandler);
251         }
252         Future<Void> future = ch.register();
253         if (future.isDone()) {
254             registerDone(ch, future);
255         } else {
256             future.addListener(ch, CHILD_CHANNEL_REGISTRATION_LISTENER);
257         }
258     }
259 
260     // TODO: This is most likely not the best way to expose this, need to think more about it.
261     Http2StreamChannel newOutboundStream() {
262         return new DefaultHttp2StreamChannel(this, (DefaultHttp2FrameStream) newStream(), ++idCount, null);
263     }
264 
265     @Override
266     public void channelExceptionCaught(ChannelHandlerContext ctx, final Throwable cause) throws Exception {
267         if (cause instanceof Http2FrameStreamException) {
268             Http2FrameStreamException exception = (Http2FrameStreamException) cause;
269             Http2FrameStream stream = exception.stream();
270             DefaultHttp2StreamChannel childChannel = (DefaultHttp2StreamChannel)
271                     ((DefaultHttp2FrameStream) stream).attachment;
272             try {
273                 childChannel.pipeline().fireChannelExceptionCaught(cause.getCause());
274             } finally {
275                 childChannel.closeForcibly();
276             }
277             return;
278         }
279         if (cause.getCause() instanceof SSLException) {
280             forEachActiveStream(stream -> {
281                 DefaultHttp2StreamChannel childChannel = (DefaultHttp2StreamChannel)
282                         ((DefaultHttp2FrameStream) stream).attachment;
283                 childChannel.pipeline().fireChannelExceptionCaught(cause);
284                 return true;
285             });
286         }
287         ctx.fireChannelExceptionCaught(cause);
288     }
289 
290     private static boolean isServer(ChannelHandlerContext ctx) {
291         return ctx.channel().parent() instanceof ServerChannel;
292     }
293 
294     private void onHttp2GoAwayFrame(ChannelHandlerContext ctx, final Http2GoAwayFrame goAwayFrame) {
295         if (goAwayFrame.lastStreamId() == Integer.MAX_VALUE) {
296             // None of the streams can have an id greater than Integer.MAX_VALUE
297             return;
298         }
299         // Notify which streams were not processed by the remote peer and are safe to retry on another connection:
300         try {
301             final boolean server = isServer(ctx);
302             forEachActiveStream(stream -> {
303                 final int streamId = stream.id();
304                 if (streamId > goAwayFrame.lastStreamId() && Http2CodecUtil.isStreamIdValid(streamId, server)) {
305                     final DefaultHttp2StreamChannel childChannel = (DefaultHttp2StreamChannel)
306                             ((DefaultHttp2FrameStream) stream).attachment;
307                     childChannel.pipeline().fireChannelInboundEvent(goAwayFrame.copy());
308                 }
309                 return true;
310             });
311         } catch (Http2Exception e) {
312             ctx.fireChannelExceptionCaught(e);
313             ctx.close();
314         }
315     }
316 
317     /**
318      * Notifies any child streams of the read completion.
319      */
320     @Override
321     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
322         processPendingReadCompleteQueue();
323         ctx.fireChannelReadComplete();
324     }
325 
326     private void processPendingReadCompleteQueue() {
327         parentReadInProgress = true;
328         // If we have many child channel we can optimize for the case when multiple call flush() in
329         // channelReadComplete(...) callbacks and only do it once as otherwise we will end-up with multiple
330         // write calls on the socket which is expensive.
331         DefaultHttp2StreamChannel childChannel = readCompletePendingQueue.poll();
332         if (childChannel != null) {
333             try {
334                 do {
335                     childChannel.fireChildReadComplete();
336                     childChannel = readCompletePendingQueue.poll();
337                 } while (childChannel != null);
338             } finally {
339                 parentReadInProgress = false;
340                 readCompletePendingQueue.clear();
341                 ctx.flush();
342             }
343         } else {
344             parentReadInProgress = false;
345         }
346     }
347 
348     boolean isParentReadInProgress() {
349         return parentReadInProgress;
350     }
351 
352     void addChannelToReadCompletePendingQueue(DefaultHttp2StreamChannel channel) {
353         // If there is no space left in the queue, just keep on processing everything that is already
354         // stored there and try again.
355         while (!readCompletePendingQueue.offer(channel)) {
356             processPendingReadCompleteQueue();
357         }
358     }
359 
360     ChannelHandlerContext parentContext() {
361         return ctx;
362     }
363 }