View Javadoc
1   /*
2    * Copyright 2016 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.codec.http2;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.channel.Channel;
20  import io.netty.channel.ChannelConfig;
21  import io.netty.channel.ChannelFuture;
22  import io.netty.channel.ChannelHandler;
23  import io.netty.channel.ChannelHandlerContext;
24  import io.netty.channel.ChannelPromise;
25  import io.netty.channel.EventLoop;
26  import io.netty.channel.socket.ChannelInputShutdownReadComplete;
27  import io.netty.channel.socket.ChannelOutputShutdownEvent;
28  import io.netty.handler.ssl.SslCloseCompletionEvent;
29  import io.netty.util.ReferenceCounted;
30  
31  import java.util.ArrayDeque;
32  import java.util.Queue;
33  
34  import static io.netty.handler.codec.http2.AbstractHttp2StreamChannel.CHANNEL_INPUT_SHUTDOWN_READ_COMPLETE_VISITOR;
35  import static io.netty.handler.codec.http2.AbstractHttp2StreamChannel.CHANNEL_OUTPUT_SHUTDOWN_EVENT_VISITOR;
36  import static io.netty.handler.codec.http2.AbstractHttp2StreamChannel.SSL_CLOSE_COMPLETION_EVENT_VISITOR;
37  import static io.netty.handler.codec.http2.Http2CodecUtil.HTTP_UPGRADE_STREAM_ID;
38  import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
39  import static io.netty.handler.codec.http2.Http2Exception.connectionError;
40  
41  /**
42   * An HTTP/2 handler that creates child channels for each stream.
43   *
44   * <p>When a new stream is created, a new {@link Channel} is created for it. Applications send and
45   * receive {@link Http2StreamFrame}s on the created channel. {@link ByteBuf}s cannot be processed by the channel;
46   * all writes that reach the head of the pipeline must be an instance of {@link Http2StreamFrame}. Writes that reach
47   * the head of the pipeline are processed directly by this handler and cannot be intercepted.
48   *
49   * <p>The child channel will be notified of user events that impact the stream, such as {@link
50   * Http2GoAwayFrame} and {@link Http2ResetFrame}, as soon as they occur. Although {@code
51   * Http2GoAwayFrame} and {@code Http2ResetFrame} signify that the remote is ignoring further
52   * communication, closing of the channel is delayed until any inbound queue is drained with {@link
53   * Channel#read()}, which follows the default behavior of channels in Netty. Applications are
54   * free to close the channel in response to such events if they don't have use for any queued
55   * messages. Any connection level events like {@link Http2SettingsFrame} and {@link Http2GoAwayFrame}
56   * will be processed internally and also propagated down the pipeline for other handlers to act on.
57   *
58   * <p>Outbound streams are supported via the {@link Http2StreamChannelBootstrap}.
59   *
60   * <p>{@link ChannelConfig#setMaxMessagesPerRead(int)} and {@link ChannelConfig#setAutoRead(boolean)} are supported.
61   *
62   * <h3>Reference Counting</h3>
63   *
64   * Some {@link Http2StreamFrame}s implement the {@link ReferenceCounted} interface, as they carry
65   * reference counted objects (e.g. {@link ByteBuf}s). The multiplex codec will call {@link ReferenceCounted#retain()}
66   * before propagating a reference counted object through the pipeline, and thus an application handler needs to release
67   * such an object after having consumed it. For more information on reference counting take a look at
68   * https://netty.io/wiki/reference-counted-objects.html
69   *
70   * <h3>Channel Events</h3>
71   *
72   * A child channel becomes active as soon as it is registered to an {@link EventLoop}. Therefore, an active channel
73   * does not map to an active HTTP/2 stream immediately. Only once a {@link Http2HeadersFrame} has been successfully sent
74   * or received, does the channel map to an active HTTP/2 stream. In case it is not possible to open a new HTTP/2 stream
75   * (i.e. due to the maximum number of active streams being exceeded), the child channel receives an exception
76   * indicating the cause and is closed immediately thereafter.
77   *
78   * <h3>Writability and Flow Control</h3>
79   *
80   * A child channel observes outbound/remote flow control via the channel's writability. A channel only becomes writable
81   * when it maps to an active HTTP/2 stream and the stream's flow control window is greater than zero. A child channel
82   * does not know about the connection-level flow control window. {@link ChannelHandler}s are free to ignore the
83   * channel's writability, in which case the excessive writes will be buffered by the parent channel. It's important to
84   * note that only {@link Http2DataFrame}s are subject to HTTP/2 flow control.
85   *
86   * @deprecated use {@link Http2FrameCodecBuilder} together with {@link Http2MultiplexHandler}.
87   */
88  @Deprecated
89  public class Http2MultiplexCodec extends Http2FrameCodec {
90  
91      private final ChannelHandler inboundStreamHandler;
92      private final ChannelHandler upgradeStreamHandler;
93      private final Queue<AbstractHttp2StreamChannel> readCompletePendingQueue =
94              new MaxCapacityQueue<AbstractHttp2StreamChannel>(new ArrayDeque<AbstractHttp2StreamChannel>(8),
95                      // Choose 100 which is what is used most of the times as default.
96                      Http2CodecUtil.SMALLEST_MAX_CONCURRENT_STREAMS);
97  
98      private boolean parentReadInProgress;
99      private int idCount;
100 
101     // Need to be volatile as accessed from within the Http2MultiplexCodecStreamChannel in a multi-threaded fashion.
102     volatile ChannelHandlerContext ctx;
103 
104     Http2MultiplexCodec(Http2ConnectionEncoder encoder,
105                         Http2ConnectionDecoder decoder,
106                         Http2Settings initialSettings,
107                         ChannelHandler inboundStreamHandler,
108                         ChannelHandler upgradeStreamHandler, boolean decoupleCloseAndGoAway, boolean flushPreface) {
109         super(encoder, decoder, initialSettings, decoupleCloseAndGoAway, flushPreface);
110         this.inboundStreamHandler = inboundStreamHandler;
111         this.upgradeStreamHandler = upgradeStreamHandler;
112     }
113 
114     @Override
115     public void onHttpClientUpgrade() throws Http2Exception {
116         // We must have an upgrade handler or else we can't handle the stream
117         if (upgradeStreamHandler == null) {
118             throw connectionError(INTERNAL_ERROR, "Client is misconfigured for upgrade requests");
119         }
120         // Creates the Http2Stream in the Connection.
121         super.onHttpClientUpgrade();
122     }
123 
124     @Override
125     public final void handlerAdded0(ChannelHandlerContext ctx) throws Exception {
126         if (ctx.executor() != ctx.channel().eventLoop()) {
127             throw new IllegalStateException("EventExecutor must be EventLoop of Channel");
128         }
129         this.ctx = ctx;
130     }
131 
132     @Override
133     public final void handlerRemoved0(ChannelHandlerContext ctx) throws Exception {
134         super.handlerRemoved0(ctx);
135 
136         readCompletePendingQueue.clear();
137     }
138 
139     @Override
140     final void onHttp2Frame(ChannelHandlerContext ctx, Http2Frame frame) {
141         if (frame instanceof Http2StreamFrame) {
142             Http2StreamFrame streamFrame = (Http2StreamFrame) frame;
143             AbstractHttp2StreamChannel channel  = (AbstractHttp2StreamChannel)
144                     ((DefaultHttp2FrameStream) streamFrame.stream()).attachment;
145             channel.fireChildRead(streamFrame);
146             return;
147         }
148         if (frame instanceof Http2GoAwayFrame) {
149             onHttp2GoAwayFrame(ctx, (Http2GoAwayFrame) frame);
150         }
151         // Send frames down the pipeline
152         ctx.fireChannelRead(frame);
153     }
154 
155     @Override
156     final void onHttp2StreamStateChanged(ChannelHandlerContext ctx, DefaultHttp2FrameStream stream) {
157         switch (stream.state()) {
158             case HALF_CLOSED_LOCAL:
159                 if (stream.id() != HTTP_UPGRADE_STREAM_ID) {
160                     // Ignore everything which was not caused by an upgrade
161                     break;
162                 }
163                 // fall-through
164             case HALF_CLOSED_REMOTE:
165                 // fall-through
166             case OPEN:
167                 if (stream.attachment != null) {
168                     // ignore if child channel was already created.
169                     break;
170                 }
171                 final Http2MultiplexCodecStreamChannel streamChannel;
172                 // We need to handle upgrades special when on the client side.
173                 if (stream.id() == HTTP_UPGRADE_STREAM_ID && !connection().isServer()) {
174                     // Add our upgrade handler to the channel and then register the channel.
175                     // The register call fires the channelActive, etc.
176                     assert upgradeStreamHandler != null;
177                     streamChannel = new Http2MultiplexCodecStreamChannel(stream, upgradeStreamHandler);
178                     streamChannel.closeOutbound();
179                 } else {
180                     streamChannel = new Http2MultiplexCodecStreamChannel(stream, inboundStreamHandler);
181                 }
182                 ChannelFuture future = ctx.channel().eventLoop().register(streamChannel);
183                 if (future.isDone()) {
184                     Http2MultiplexHandler.registerDone(future);
185                 } else {
186                     future.addListener(Http2MultiplexHandler.CHILD_CHANNEL_REGISTRATION_LISTENER);
187                 }
188                 break;
189             case CLOSED:
190                 AbstractHttp2StreamChannel channel = (AbstractHttp2StreamChannel) stream.attachment;
191                 if (channel != null) {
192                     channel.streamClosed();
193                 }
194                 break;
195             default:
196                 // ignore for now
197                 break;
198         }
199     }
200 
201     // TODO: This is most likely not the best way to expose this, need to think more about it.
202     final Http2StreamChannel newOutboundStream() {
203         return new Http2MultiplexCodecStreamChannel(newStream(), null);
204     }
205 
206     @Override
207     final void onHttp2FrameStreamException(ChannelHandlerContext ctx, Http2FrameStreamException cause) {
208         Http2FrameStream stream = cause.stream();
209         AbstractHttp2StreamChannel channel = (AbstractHttp2StreamChannel) ((DefaultHttp2FrameStream) stream).attachment;
210 
211         try {
212             channel.pipeline().fireExceptionCaught(cause.getCause());
213         } finally {
214             // Close with the correct error that causes this stream exception.
215             // See https://github.com/netty/netty/issues/13235#issuecomment-1441994672
216             channel.closeWithError(cause.error());
217         }
218     }
219 
220     private void onHttp2GoAwayFrame(ChannelHandlerContext ctx, final Http2GoAwayFrame goAwayFrame) {
221         if (goAwayFrame.lastStreamId() == Integer.MAX_VALUE) {
222             // None of the streams can have an id greater than Integer.MAX_VALUE
223             return;
224         }
225         // Notify which streams were not processed by the remote peer and are safe to retry on another connection:
226         try {
227             forEachActiveStream(new Http2FrameStreamVisitor() {
228                 @Override
229                 public boolean visit(Http2FrameStream stream) {
230                     final int streamId = stream.id();
231                     AbstractHttp2StreamChannel channel = (AbstractHttp2StreamChannel)
232                             ((DefaultHttp2FrameStream) stream).attachment;
233                     if (streamId > goAwayFrame.lastStreamId() && connection().local().isValidStreamId(streamId)) {
234                         channel.pipeline().fireUserEventTriggered(goAwayFrame.retainedDuplicate());
235                     }
236                     return true;
237                 }
238             });
239         } catch (Http2Exception e) {
240             ctx.fireExceptionCaught(e);
241             ctx.close();
242         }
243     }
244 
245     /**
246      * Notifies any child streams of the read completion.
247      */
248     @Override
249     public final void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
250         processPendingReadCompleteQueue();
251         channelReadComplete0(ctx);
252     }
253 
254     private void processPendingReadCompleteQueue() {
255         parentReadInProgress = true;
256         try {
257             // If we have many child channel we can optimize for the case when multiple call flush() in
258             // channelReadComplete(...) callbacks and only do it once as otherwise we will end-up with multiple
259             // write calls on the socket which is expensive.
260             for (;;) {
261                 AbstractHttp2StreamChannel childChannel = readCompletePendingQueue.poll();
262                 if (childChannel == null) {
263                     break;
264                 }
265                 childChannel.fireChildReadComplete();
266             }
267         } finally {
268             parentReadInProgress = false;
269             readCompletePendingQueue.clear();
270             // We always flush as this is what Http2ConnectionHandler does for now.
271             flush0(ctx);
272         }
273     }
274     @Override
275     public final void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
276         parentReadInProgress = true;
277         super.channelRead(ctx, msg);
278     }
279 
280     @Override
281     public final void channelWritabilityChanged(final ChannelHandlerContext ctx) throws Exception {
282         if (ctx.channel().isWritable()) {
283             // While the writability state may change during iterating of the streams we just set all of the streams
284             // to writable to not affect fairness. These will be "limited" by their own watermarks in any case.
285             forEachActiveStream(AbstractHttp2StreamChannel.WRITABLE_VISITOR);
286         }
287 
288         super.channelWritabilityChanged(ctx);
289     }
290 
291     @Override
292     final void onUserEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
293         if (evt == ChannelInputShutdownReadComplete.INSTANCE) {
294             forEachActiveStream(CHANNEL_INPUT_SHUTDOWN_READ_COMPLETE_VISITOR);
295         } else if (evt == ChannelOutputShutdownEvent.INSTANCE) {
296             forEachActiveStream(CHANNEL_OUTPUT_SHUTDOWN_EVENT_VISITOR);
297         } else if (evt == SslCloseCompletionEvent.SUCCESS) {
298             forEachActiveStream(SSL_CLOSE_COMPLETION_EVENT_VISITOR);
299         }
300         super.onUserEventTriggered(ctx, evt);
301     }
302 
303     final void flush0(ChannelHandlerContext ctx) {
304         flush(ctx);
305     }
306 
307     private final class Http2MultiplexCodecStreamChannel extends AbstractHttp2StreamChannel {
308 
309         Http2MultiplexCodecStreamChannel(DefaultHttp2FrameStream stream, ChannelHandler inboundHandler) {
310             super(stream, ++idCount, inboundHandler);
311         }
312 
313         @Override
314         protected boolean isParentReadInProgress() {
315             return parentReadInProgress;
316         }
317 
318         @Override
319         protected void addChannelToReadCompletePendingQueue() {
320             // If there is no space left in the queue, just keep on processing everything that is already
321             // stored there and try again.
322             while (!readCompletePendingQueue.offer(this)) {
323                 processPendingReadCompleteQueue();
324             }
325         }
326 
327         @Override
328         protected ChannelHandlerContext parentContext() {
329             return ctx;
330         }
331 
332         @Override
333         protected ChannelFuture write0(ChannelHandlerContext ctx, Object msg) {
334             ChannelPromise promise = ctx.newPromise();
335             Http2MultiplexCodec.this.write(ctx, msg, promise);
336             return promise;
337         }
338 
339         @Override
340         protected void flush0(ChannelHandlerContext ctx) {
341             Http2MultiplexCodec.this.flush0(ctx);
342         }
343     }
344 }