View Javadoc
1   /*
2    * Copyright 2016 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.codec.http2;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.channel.Channel;
20  import io.netty.channel.ChannelConfig;
21  import io.netty.channel.ChannelFuture;
22  import io.netty.channel.ChannelHandler;
23  import io.netty.channel.ChannelHandlerContext;
24  import io.netty.channel.ChannelPromise;
25  import io.netty.channel.EventLoop;
26  import io.netty.util.ReferenceCounted;
27  
28  import io.netty.util.internal.UnstableApi;
29  
30  import java.util.ArrayDeque;
31  import java.util.Queue;
32  
33  import static io.netty.handler.codec.http2.Http2CodecUtil.HTTP_UPGRADE_STREAM_ID;
34  import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
35  import static io.netty.handler.codec.http2.Http2Exception.connectionError;
36  
37  /**
38   * An HTTP/2 handler that creates child channels for each stream.
39   *
40   * <p>When a new stream is created, a new {@link Channel} is created for it. Applications send and
41   * receive {@link Http2StreamFrame}s on the created channel. {@link ByteBuf}s cannot be processed by the channel;
42   * all writes that reach the head of the pipeline must be an instance of {@link Http2StreamFrame}. Writes that reach
43   * the head of the pipeline are processed directly by this handler and cannot be intercepted.
44   *
45   * <p>The child channel will be notified of user events that impact the stream, such as {@link
46   * Http2GoAwayFrame} and {@link Http2ResetFrame}, as soon as they occur. Although {@code
47   * Http2GoAwayFrame} and {@code Http2ResetFrame} signify that the remote is ignoring further
48   * communication, closing of the channel is delayed until any inbound queue is drained with {@link
49   * Channel#read()}, which follows the default behavior of channels in Netty. Applications are
50   * free to close the channel in response to such events if they don't have use for any queued
51   * messages. Any connection level events like {@link Http2SettingsFrame} and {@link Http2GoAwayFrame}
52   * will be processed internally and also propagated down the pipeline for other handlers to act on.
53   *
54   * <p>Outbound streams are supported via the {@link Http2StreamChannelBootstrap}.
55   *
56   * <p>{@link ChannelConfig#setMaxMessagesPerRead(int)} and {@link ChannelConfig#setAutoRead(boolean)} are supported.
57   *
58   * <h3>Reference Counting</h3>
59   *
60   * Some {@link Http2StreamFrame}s implement the {@link ReferenceCounted} interface, as they carry
61   * reference counted objects (e.g. {@link ByteBuf}s). The multiplex codec will call {@link ReferenceCounted#retain()}
62   * before propagating a reference counted object through the pipeline, and thus an application handler needs to release
63   * such an object after having consumed it. For more information on reference counting take a look at
64   * https://netty.io/wiki/reference-counted-objects.html
65   *
66   * <h3>Channel Events</h3>
67   *
68   * A child channel becomes active as soon as it is registered to an {@link EventLoop}. Therefore, an active channel
69   * does not map to an active HTTP/2 stream immediately. Only once a {@link Http2HeadersFrame} has been successfully sent
70   * or received, does the channel map to an active HTTP/2 stream. In case it is not possible to open a new HTTP/2 stream
71   * (i.e. due to the maximum number of active streams being exceeded), the child channel receives an exception
72   * indicating the cause and is closed immediately thereafter.
73   *
74   * <h3>Writability and Flow Control</h3>
75   *
76   * A child channel observes outbound/remote flow control via the channel's writability. A channel only becomes writable
77   * when it maps to an active HTTP/2 stream and the stream's flow control window is greater than zero. A child channel
78   * does not know about the connection-level flow control window. {@link ChannelHandler}s are free to ignore the
79   * channel's writability, in which case the excessive writes will be buffered by the parent channel. It's important to
80   * note that only {@link Http2DataFrame}s are subject to HTTP/2 flow control.
81   *
82   * @deprecated use {@link Http2FrameCodecBuilder} together with {@link Http2MultiplexHandler}.
83   */
84  @Deprecated
85  @UnstableApi
86  public class Http2MultiplexCodec extends Http2FrameCodec {
87  
88      private final ChannelHandler inboundStreamHandler;
89      private final ChannelHandler upgradeStreamHandler;
90      private final Queue<AbstractHttp2StreamChannel> readCompletePendingQueue =
91              new MaxCapacityQueue<AbstractHttp2StreamChannel>(new ArrayDeque<AbstractHttp2StreamChannel>(8),
92                      // Choose 100 which is what is used most of the times as default.
93                      Http2CodecUtil.SMALLEST_MAX_CONCURRENT_STREAMS);
94  
95      private boolean parentReadInProgress;
96      private int idCount;
97  
98      // Need to be volatile as accessed from within the Http2MultiplexCodecStreamChannel in a multi-threaded fashion.
99      volatile ChannelHandlerContext ctx;
100 
101     Http2MultiplexCodec(Http2ConnectionEncoder encoder,
102                         Http2ConnectionDecoder decoder,
103                         Http2Settings initialSettings,
104                         ChannelHandler inboundStreamHandler,
105                         ChannelHandler upgradeStreamHandler, boolean decoupleCloseAndGoAway, boolean flushPreface) {
106         super(encoder, decoder, initialSettings, decoupleCloseAndGoAway, flushPreface);
107         this.inboundStreamHandler = inboundStreamHandler;
108         this.upgradeStreamHandler = upgradeStreamHandler;
109     }
110 
111     @Override
112     public void onHttpClientUpgrade() throws Http2Exception {
113         // We must have an upgrade handler or else we can't handle the stream
114         if (upgradeStreamHandler == null) {
115             throw connectionError(INTERNAL_ERROR, "Client is misconfigured for upgrade requests");
116         }
117         // Creates the Http2Stream in the Connection.
118         super.onHttpClientUpgrade();
119     }
120 
121     @Override
122     public final void handlerAdded0(ChannelHandlerContext ctx) throws Exception {
123         if (ctx.executor() != ctx.channel().eventLoop()) {
124             throw new IllegalStateException("EventExecutor must be EventLoop of Channel");
125         }
126         this.ctx = ctx;
127     }
128 
129     @Override
130     public final void handlerRemoved0(ChannelHandlerContext ctx) throws Exception {
131         super.handlerRemoved0(ctx);
132 
133         readCompletePendingQueue.clear();
134     }
135 
136     @Override
137     final void onHttp2Frame(ChannelHandlerContext ctx, Http2Frame frame) {
138         if (frame instanceof Http2StreamFrame) {
139             Http2StreamFrame streamFrame = (Http2StreamFrame) frame;
140             AbstractHttp2StreamChannel channel  = (AbstractHttp2StreamChannel)
141                     ((DefaultHttp2FrameStream) streamFrame.stream()).attachment;
142             channel.fireChildRead(streamFrame);
143             return;
144         }
145         if (frame instanceof Http2GoAwayFrame) {
146             onHttp2GoAwayFrame(ctx, (Http2GoAwayFrame) frame);
147         }
148         // Send frames down the pipeline
149         ctx.fireChannelRead(frame);
150     }
151 
152     @Override
153     final void onHttp2StreamStateChanged(ChannelHandlerContext ctx, DefaultHttp2FrameStream stream) {
154         switch (stream.state()) {
155             case HALF_CLOSED_LOCAL:
156                 if (stream.id() != HTTP_UPGRADE_STREAM_ID) {
157                     // Ignore everything which was not caused by an upgrade
158                     break;
159                 }
160                 // fall-through
161             case HALF_CLOSED_REMOTE:
162                 // fall-through
163             case OPEN:
164                 if (stream.attachment != null) {
165                     // ignore if child channel was already created.
166                     break;
167                 }
168                 final Http2MultiplexCodecStreamChannel streamChannel;
169                 // We need to handle upgrades special when on the client side.
170                 if (stream.id() == HTTP_UPGRADE_STREAM_ID && !connection().isServer()) {
171                     // Add our upgrade handler to the channel and then register the channel.
172                     // The register call fires the channelActive, etc.
173                     assert upgradeStreamHandler != null;
174                     streamChannel = new Http2MultiplexCodecStreamChannel(stream, upgradeStreamHandler);
175                     streamChannel.closeOutbound();
176                 } else {
177                     streamChannel = new Http2MultiplexCodecStreamChannel(stream, inboundStreamHandler);
178                 }
179                 ChannelFuture future = ctx.channel().eventLoop().register(streamChannel);
180                 if (future.isDone()) {
181                     Http2MultiplexHandler.registerDone(future);
182                 } else {
183                     future.addListener(Http2MultiplexHandler.CHILD_CHANNEL_REGISTRATION_LISTENER);
184                 }
185                 break;
186             case CLOSED:
187                 AbstractHttp2StreamChannel channel = (AbstractHttp2StreamChannel) stream.attachment;
188                 if (channel != null) {
189                     channel.streamClosed();
190                 }
191                 break;
192             default:
193                 // ignore for now
194                 break;
195         }
196     }
197 
198     // TODO: This is most likely not the best way to expose this, need to think more about it.
199     final Http2StreamChannel newOutboundStream() {
200         return new Http2MultiplexCodecStreamChannel(newStream(), null);
201     }
202 
203     @Override
204     final void onHttp2FrameStreamException(ChannelHandlerContext ctx, Http2FrameStreamException cause) {
205         Http2FrameStream stream = cause.stream();
206         AbstractHttp2StreamChannel channel = (AbstractHttp2StreamChannel) ((DefaultHttp2FrameStream) stream).attachment;
207 
208         try {
209             channel.pipeline().fireExceptionCaught(cause.getCause());
210         } finally {
211             channel.unsafe().closeForcibly();
212         }
213     }
214 
215     private void onHttp2GoAwayFrame(ChannelHandlerContext ctx, final Http2GoAwayFrame goAwayFrame) {
216         if (goAwayFrame.lastStreamId() == Integer.MAX_VALUE) {
217             // None of the streams can have an id greater than Integer.MAX_VALUE
218             return;
219         }
220         // Notify which streams were not processed by the remote peer and are safe to retry on another connection:
221         try {
222             forEachActiveStream(new Http2FrameStreamVisitor() {
223                 @Override
224                 public boolean visit(Http2FrameStream stream) {
225                     final int streamId = stream.id();
226                     AbstractHttp2StreamChannel channel = (AbstractHttp2StreamChannel)
227                             ((DefaultHttp2FrameStream) stream).attachment;
228                     if (streamId > goAwayFrame.lastStreamId() && connection().local().isValidStreamId(streamId)) {
229                         channel.pipeline().fireUserEventTriggered(goAwayFrame.retainedDuplicate());
230                     }
231                     return true;
232                 }
233             });
234         } catch (Http2Exception e) {
235             ctx.fireExceptionCaught(e);
236             ctx.close();
237         }
238     }
239 
240     /**
241      * Notifies any child streams of the read completion.
242      */
243     @Override
244     public final void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
245         processPendingReadCompleteQueue();
246         channelReadComplete0(ctx);
247     }
248 
249     private void processPendingReadCompleteQueue() {
250         parentReadInProgress = true;
251         try {
252             // If we have many child channel we can optimize for the case when multiple call flush() in
253             // channelReadComplete(...) callbacks and only do it once as otherwise we will end-up with multiple
254             // write calls on the socket which is expensive.
255             for (;;) {
256                 AbstractHttp2StreamChannel childChannel = readCompletePendingQueue.poll();
257                 if (childChannel == null) {
258                     break;
259                 }
260                 childChannel.fireChildReadComplete();
261             }
262         } finally {
263             parentReadInProgress = false;
264             readCompletePendingQueue.clear();
265             // We always flush as this is what Http2ConnectionHandler does for now.
266             flush0(ctx);
267         }
268     }
269     @Override
270     public final void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
271         parentReadInProgress = true;
272         super.channelRead(ctx, msg);
273     }
274 
275     @Override
276     public final void channelWritabilityChanged(final ChannelHandlerContext ctx) throws Exception {
277         if (ctx.channel().isWritable()) {
278             // While the writability state may change during iterating of the streams we just set all of the streams
279             // to writable to not affect fairness. These will be "limited" by their own watermarks in any case.
280             forEachActiveStream(AbstractHttp2StreamChannel.WRITABLE_VISITOR);
281         }
282 
283         super.channelWritabilityChanged(ctx);
284     }
285 
286     final void flush0(ChannelHandlerContext ctx) {
287         flush(ctx);
288     }
289 
290     private final class Http2MultiplexCodecStreamChannel extends AbstractHttp2StreamChannel {
291 
292         Http2MultiplexCodecStreamChannel(DefaultHttp2FrameStream stream, ChannelHandler inboundHandler) {
293             super(stream, ++idCount, inboundHandler);
294         }
295 
296         @Override
297         protected boolean isParentReadInProgress() {
298             return parentReadInProgress;
299         }
300 
301         @Override
302         protected void addChannelToReadCompletePendingQueue() {
303             // If there is no space left in the queue, just keep on processing everything that is already
304             // stored there and try again.
305             while (!readCompletePendingQueue.offer(this)) {
306                 processPendingReadCompleteQueue();
307             }
308         }
309 
310         @Override
311         protected ChannelHandlerContext parentContext() {
312             return ctx;
313         }
314 
315         @Override
316         protected ChannelFuture write0(ChannelHandlerContext ctx, Object msg) {
317             ChannelPromise promise = ctx.newPromise();
318             Http2MultiplexCodec.this.write(ctx, msg, promise);
319             return promise;
320         }
321 
322         @Override
323         protected void flush0(ChannelHandlerContext ctx) {
324             Http2MultiplexCodec.this.flush0(ctx);
325         }
326     }
327 }