View Javadoc
1   /*
2    * Copyright 2016 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.codec.http2;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.channel.Channel;
20  import io.netty.channel.ChannelConfig;
21  import io.netty.channel.ChannelFuture;
22  import io.netty.channel.ChannelHandler;
23  import io.netty.channel.ChannelHandlerContext;
24  import io.netty.channel.ChannelPromise;
25  import io.netty.channel.EventLoop;
26  import io.netty.channel.socket.ChannelInputShutdownReadComplete;
27  import io.netty.channel.socket.ChannelOutputShutdownEvent;
28  import io.netty.handler.ssl.SslCloseCompletionEvent;
29  import io.netty.util.ReferenceCounted;
30  import io.netty.util.internal.UnstableApi;
31  
32  import java.util.ArrayDeque;
33  import java.util.Queue;
34  
35  import static io.netty.handler.codec.http2.AbstractHttp2StreamChannel.CHANNEL_INPUT_SHUTDOWN_READ_COMPLETE_VISITOR;
36  import static io.netty.handler.codec.http2.AbstractHttp2StreamChannel.CHANNEL_OUTPUT_SHUTDOWN_EVENT_VISITOR;
37  import static io.netty.handler.codec.http2.AbstractHttp2StreamChannel.SSL_CLOSE_COMPLETION_EVENT_VISITOR;
38  import static io.netty.handler.codec.http2.Http2CodecUtil.HTTP_UPGRADE_STREAM_ID;
39  import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
40  import static io.netty.handler.codec.http2.Http2Exception.connectionError;
41  
42  /**
43   * An HTTP/2 handler that creates child channels for each stream.
44   *
45   * <p>When a new stream is created, a new {@link Channel} is created for it. Applications send and
46   * receive {@link Http2StreamFrame}s on the created channel. {@link ByteBuf}s cannot be processed by the channel;
47   * all writes that reach the head of the pipeline must be an instance of {@link Http2StreamFrame}. Writes that reach
48   * the head of the pipeline are processed directly by this handler and cannot be intercepted.
49   *
50   * <p>The child channel will be notified of user events that impact the stream, such as {@link
51   * Http2GoAwayFrame} and {@link Http2ResetFrame}, as soon as they occur. Although {@code
52   * Http2GoAwayFrame} and {@code Http2ResetFrame} signify that the remote is ignoring further
53   * communication, closing of the channel is delayed until any inbound queue is drained with {@link
54   * Channel#read()}, which follows the default behavior of channels in Netty. Applications are
55   * free to close the channel in response to such events if they don't have use for any queued
56   * messages. Any connection level events like {@link Http2SettingsFrame} and {@link Http2GoAwayFrame}
57   * will be processed internally and also propagated down the pipeline for other handlers to act on.
58   *
59   * <p>Outbound streams are supported via the {@link Http2StreamChannelBootstrap}.
60   *
61   * <p>{@link ChannelConfig#setMaxMessagesPerRead(int)} and {@link ChannelConfig#setAutoRead(boolean)} are supported.
62   *
63   * <h3>Reference Counting</h3>
64   *
65   * Some {@link Http2StreamFrame}s implement the {@link ReferenceCounted} interface, as they carry
66   * reference counted objects (e.g. {@link ByteBuf}s). The multiplex codec will call {@link ReferenceCounted#retain()}
67   * before propagating a reference counted object through the pipeline, and thus an application handler needs to release
68   * such an object after having consumed it. For more information on reference counting take a look at
69   * https://netty.io/wiki/reference-counted-objects.html
70   *
71   * <h3>Channel Events</h3>
72   *
73   * A child channel becomes active as soon as it is registered to an {@link EventLoop}. Therefore, an active channel
74   * does not map to an active HTTP/2 stream immediately. Only once a {@link Http2HeadersFrame} has been successfully sent
75   * or received, does the channel map to an active HTTP/2 stream. In case it is not possible to open a new HTTP/2 stream
76   * (i.e. due to the maximum number of active streams being exceeded), the child channel receives an exception
77   * indicating the cause and is closed immediately thereafter.
78   *
79   * <h3>Writability and Flow Control</h3>
80   *
81   * A child channel observes outbound/remote flow control via the channel's writability. A channel only becomes writable
82   * when it maps to an active HTTP/2 stream and the stream's flow control window is greater than zero. A child channel
83   * does not know about the connection-level flow control window. {@link ChannelHandler}s are free to ignore the
84   * channel's writability, in which case the excessive writes will be buffered by the parent channel. It's important to
85   * note that only {@link Http2DataFrame}s are subject to HTTP/2 flow control.
86   *
87   * @deprecated use {@link Http2FrameCodecBuilder} together with {@link Http2MultiplexHandler}.
88   */
89  @Deprecated
90  @UnstableApi
91  public class Http2MultiplexCodec extends Http2FrameCodec {
92  
93      private final ChannelHandler inboundStreamHandler;
94      private final ChannelHandler upgradeStreamHandler;
95      private final Queue<AbstractHttp2StreamChannel> readCompletePendingQueue =
96              new MaxCapacityQueue<AbstractHttp2StreamChannel>(new ArrayDeque<AbstractHttp2StreamChannel>(8),
97                      // Choose 100 which is what is used most of the times as default.
98                      Http2CodecUtil.SMALLEST_MAX_CONCURRENT_STREAMS);
99  
100     private boolean parentReadInProgress;
101     private int idCount;
102 
103     // Need to be volatile as accessed from within the Http2MultiplexCodecStreamChannel in a multi-threaded fashion.
104     volatile ChannelHandlerContext ctx;
105 
106     Http2MultiplexCodec(Http2ConnectionEncoder encoder,
107                         Http2ConnectionDecoder decoder,
108                         Http2Settings initialSettings,
109                         ChannelHandler inboundStreamHandler,
110                         ChannelHandler upgradeStreamHandler, boolean decoupleCloseAndGoAway, boolean flushPreface) {
111         super(encoder, decoder, initialSettings, decoupleCloseAndGoAway, flushPreface);
112         this.inboundStreamHandler = inboundStreamHandler;
113         this.upgradeStreamHandler = upgradeStreamHandler;
114     }
115 
116     @Override
117     public void onHttpClientUpgrade() throws Http2Exception {
118         // We must have an upgrade handler or else we can't handle the stream
119         if (upgradeStreamHandler == null) {
120             throw connectionError(INTERNAL_ERROR, "Client is misconfigured for upgrade requests");
121         }
122         // Creates the Http2Stream in the Connection.
123         super.onHttpClientUpgrade();
124     }
125 
126     @Override
127     public final void handlerAdded0(ChannelHandlerContext ctx) throws Exception {
128         if (ctx.executor() != ctx.channel().eventLoop()) {
129             throw new IllegalStateException("EventExecutor must be EventLoop of Channel");
130         }
131         this.ctx = ctx;
132     }
133 
134     @Override
135     public final void handlerRemoved0(ChannelHandlerContext ctx) throws Exception {
136         super.handlerRemoved0(ctx);
137 
138         readCompletePendingQueue.clear();
139     }
140 
141     @Override
142     final void onHttp2Frame(ChannelHandlerContext ctx, Http2Frame frame) {
143         if (frame instanceof Http2StreamFrame) {
144             Http2StreamFrame streamFrame = (Http2StreamFrame) frame;
145             AbstractHttp2StreamChannel channel  = (AbstractHttp2StreamChannel)
146                     ((DefaultHttp2FrameStream) streamFrame.stream()).attachment;
147             channel.fireChildRead(streamFrame);
148             return;
149         }
150         if (frame instanceof Http2GoAwayFrame) {
151             onHttp2GoAwayFrame(ctx, (Http2GoAwayFrame) frame);
152         }
153         // Send frames down the pipeline
154         ctx.fireChannelRead(frame);
155     }
156 
157     @Override
158     final void onHttp2StreamStateChanged(ChannelHandlerContext ctx, DefaultHttp2FrameStream stream) {
159         switch (stream.state()) {
160             case HALF_CLOSED_LOCAL:
161                 if (stream.id() != HTTP_UPGRADE_STREAM_ID) {
162                     // Ignore everything which was not caused by an upgrade
163                     break;
164                 }
165                 // fall-through
166             case HALF_CLOSED_REMOTE:
167                 // fall-through
168             case OPEN:
169                 if (stream.attachment != null) {
170                     // ignore if child channel was already created.
171                     break;
172                 }
173                 final Http2MultiplexCodecStreamChannel streamChannel;
174                 // We need to handle upgrades special when on the client side.
175                 if (stream.id() == HTTP_UPGRADE_STREAM_ID && !connection().isServer()) {
176                     // Add our upgrade handler to the channel and then register the channel.
177                     // The register call fires the channelActive, etc.
178                     assert upgradeStreamHandler != null;
179                     streamChannel = new Http2MultiplexCodecStreamChannel(stream, upgradeStreamHandler);
180                     streamChannel.closeOutbound();
181                 } else {
182                     streamChannel = new Http2MultiplexCodecStreamChannel(stream, inboundStreamHandler);
183                 }
184                 ChannelFuture future = ctx.channel().eventLoop().register(streamChannel);
185                 if (future.isDone()) {
186                     Http2MultiplexHandler.registerDone(future);
187                 } else {
188                     future.addListener(Http2MultiplexHandler.CHILD_CHANNEL_REGISTRATION_LISTENER);
189                 }
190                 break;
191             case CLOSED:
192                 AbstractHttp2StreamChannel channel = (AbstractHttp2StreamChannel) stream.attachment;
193                 if (channel != null) {
194                     channel.streamClosed();
195                 }
196                 break;
197             default:
198                 // ignore for now
199                 break;
200         }
201     }
202 
203     // TODO: This is most likely not the best way to expose this, need to think more about it.
204     final Http2StreamChannel newOutboundStream() {
205         return new Http2MultiplexCodecStreamChannel(newStream(), null);
206     }
207 
208     @Override
209     final void onHttp2FrameStreamException(ChannelHandlerContext ctx, Http2FrameStreamException cause) {
210         Http2FrameStream stream = cause.stream();
211         AbstractHttp2StreamChannel channel = (AbstractHttp2StreamChannel) ((DefaultHttp2FrameStream) stream).attachment;
212 
213         try {
214             channel.pipeline().fireExceptionCaught(cause.getCause());
215         } finally {
216             // Close with the correct error that causes this stream exception.
217             // See https://github.com/netty/netty/issues/13235#issuecomment-1441994672
218             channel.closeWithError(cause.error());
219         }
220     }
221 
222     private void onHttp2GoAwayFrame(ChannelHandlerContext ctx, final Http2GoAwayFrame goAwayFrame) {
223         if (goAwayFrame.lastStreamId() == Integer.MAX_VALUE) {
224             // None of the streams can have an id greater than Integer.MAX_VALUE
225             return;
226         }
227         // Notify which streams were not processed by the remote peer and are safe to retry on another connection:
228         try {
229             forEachActiveStream(new Http2FrameStreamVisitor() {
230                 @Override
231                 public boolean visit(Http2FrameStream stream) {
232                     final int streamId = stream.id();
233                     AbstractHttp2StreamChannel channel = (AbstractHttp2StreamChannel)
234                             ((DefaultHttp2FrameStream) stream).attachment;
235                     if (streamId > goAwayFrame.lastStreamId() && connection().local().isValidStreamId(streamId)) {
236                         channel.pipeline().fireUserEventTriggered(goAwayFrame.retainedDuplicate());
237                     }
238                     return true;
239                 }
240             });
241         } catch (Http2Exception e) {
242             ctx.fireExceptionCaught(e);
243             ctx.close();
244         }
245     }
246 
247     /**
248      * Notifies any child streams of the read completion.
249      */
250     @Override
251     public final void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
252         processPendingReadCompleteQueue();
253         channelReadComplete0(ctx);
254     }
255 
256     private void processPendingReadCompleteQueue() {
257         parentReadInProgress = true;
258         try {
259             // If we have many child channel we can optimize for the case when multiple call flush() in
260             // channelReadComplete(...) callbacks and only do it once as otherwise we will end-up with multiple
261             // write calls on the socket which is expensive.
262             for (;;) {
263                 AbstractHttp2StreamChannel childChannel = readCompletePendingQueue.poll();
264                 if (childChannel == null) {
265                     break;
266                 }
267                 childChannel.fireChildReadComplete();
268             }
269         } finally {
270             parentReadInProgress = false;
271             readCompletePendingQueue.clear();
272             // We always flush as this is what Http2ConnectionHandler does for now.
273             flush0(ctx);
274         }
275     }
276     @Override
277     public final void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
278         parentReadInProgress = true;
279         super.channelRead(ctx, msg);
280     }
281 
282     @Override
283     public final void channelWritabilityChanged(final ChannelHandlerContext ctx) throws Exception {
284         if (ctx.channel().isWritable()) {
285             // While the writability state may change during iterating of the streams we just set all of the streams
286             // to writable to not affect fairness. These will be "limited" by their own watermarks in any case.
287             forEachActiveStream(AbstractHttp2StreamChannel.WRITABLE_VISITOR);
288         }
289 
290         super.channelWritabilityChanged(ctx);
291     }
292 
293     @Override
294     final void onUserEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
295         if (evt == ChannelInputShutdownReadComplete.INSTANCE) {
296             forEachActiveStream(CHANNEL_INPUT_SHUTDOWN_READ_COMPLETE_VISITOR);
297         } else if (evt == ChannelOutputShutdownEvent.INSTANCE) {
298             forEachActiveStream(CHANNEL_OUTPUT_SHUTDOWN_EVENT_VISITOR);
299         } else if (evt == SslCloseCompletionEvent.SUCCESS) {
300             forEachActiveStream(SSL_CLOSE_COMPLETION_EVENT_VISITOR);
301         }
302         super.onUserEventTriggered(ctx, evt);
303     }
304 
305     final void flush0(ChannelHandlerContext ctx) {
306         flush(ctx);
307     }
308 
309     private final class Http2MultiplexCodecStreamChannel extends AbstractHttp2StreamChannel {
310 
311         Http2MultiplexCodecStreamChannel(DefaultHttp2FrameStream stream, ChannelHandler inboundHandler) {
312             super(stream, ++idCount, inboundHandler);
313         }
314 
315         @Override
316         protected boolean isParentReadInProgress() {
317             return parentReadInProgress;
318         }
319 
320         @Override
321         protected void addChannelToReadCompletePendingQueue() {
322             // If there is no space left in the queue, just keep on processing everything that is already
323             // stored there and try again.
324             while (!readCompletePendingQueue.offer(this)) {
325                 processPendingReadCompleteQueue();
326             }
327         }
328 
329         @Override
330         protected ChannelHandlerContext parentContext() {
331             return ctx;
332         }
333 
334         @Override
335         protected ChannelFuture write0(ChannelHandlerContext ctx, Object msg) {
336             ChannelPromise promise = ctx.newPromise();
337             Http2MultiplexCodec.this.write(ctx, msg, promise);
338             return promise;
339         }
340 
341         @Override
342         protected void flush0(ChannelHandlerContext ctx) {
343             Http2MultiplexCodec.this.flush0(ctx);
344         }
345     }
346 }