View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License, version 2.0 (the
5    * "License"); you may not use this file except in compliance with the License. You may obtain a
6    * copy of the License at:
7    *
8    * http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software distributed under the License
11   * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12   * or implied. See the License for the specific language governing permissions and limitations under
13   * the License.
14   */
15  package io.netty.handler.codec.http2;
16  
17  import static io.netty.handler.codec.http2.Http2CodecUtil.HTTP_UPGRADE_STREAM_ID;
18  import static io.netty.handler.codec.http2.Http2CodecUtil.connectionPrefaceBuf;
19  import static io.netty.handler.codec.http2.Http2CodecUtil.getEmbeddedHttp2Exception;
20  import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
21  import static io.netty.handler.codec.http2.Http2Error.NO_ERROR;
22  import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
23  import static io.netty.handler.codec.http2.Http2Exception.connectionError;
24  import static io.netty.handler.codec.http2.Http2Exception.isStreamError;
25  import static io.netty.util.internal.ObjectUtil.checkNotNull;
26  
27  import io.netty.buffer.ByteBuf;
28  import io.netty.channel.ChannelFuture;
29  import io.netty.channel.ChannelFutureListener;
30  import io.netty.channel.ChannelHandlerContext;
31  import io.netty.channel.ChannelPromise;
32  import io.netty.handler.codec.ByteToMessageDecoder;
33  import io.netty.handler.codec.http2.Http2Exception.CompositeStreamException;
34  import io.netty.handler.codec.http2.Http2Exception.StreamException;
35  
36  import java.util.Collection;
37  import java.util.List;
38  
39  /**
40   * Provides the default implementation for processing inbound frame events and delegates to a
41   * {@link Http2FrameListener}
42   * <p>
43   * This class will read HTTP/2 frames and delegate the events to a {@link Http2FrameListener}
44   * <p>
45   * This interface enforces inbound flow control functionality through
46   * {@link Http2LocalFlowController}
47   */
48  public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http2LifecycleManager {
49      private final Http2ConnectionDecoder decoder;
50      private final Http2ConnectionEncoder encoder;
51      private ByteBuf clientPrefaceString;
52      private boolean prefaceSent;
53      private ChannelFutureListener closeListener;
54  
55      public Http2ConnectionHandler(boolean server, Http2FrameListener listener) {
56          this(new DefaultHttp2Connection(server), listener);
57      }
58  
59      public Http2ConnectionHandler(Http2Connection connection, Http2FrameListener listener) {
60          this(connection, new DefaultHttp2FrameReader(), new DefaultHttp2FrameWriter(), listener);
61      }
62  
63      public Http2ConnectionHandler(Http2Connection connection, Http2FrameReader frameReader,
64              Http2FrameWriter frameWriter, Http2FrameListener listener) {
65          this(DefaultHttp2ConnectionDecoder.newBuilder().connection(connection)
66                  .frameReader(frameReader).listener(listener),
67               DefaultHttp2ConnectionEncoder.newBuilder().connection(connection)
68                  .frameWriter(frameWriter));
69      }
70  
71      /**
72       * Constructor for pre-configured encoder and decoder builders. Just sets the {@code this} as the
73       * {@link Http2LifecycleManager} and builds them.
74       */
75      public Http2ConnectionHandler(Http2ConnectionDecoder.Builder decoderBuilder,
76              Http2ConnectionEncoder.Builder encoderBuilder) {
77          checkNotNull(decoderBuilder, "decoderBuilder");
78          checkNotNull(encoderBuilder, "encoderBuilder");
79  
80          if (encoderBuilder.lifecycleManager() != decoderBuilder.lifecycleManager()) {
81              throw new IllegalArgumentException("Encoder and Decoder must share a lifecycle manager");
82          } else if (encoderBuilder.lifecycleManager() == null) {
83              encoderBuilder.lifecycleManager(this);
84              decoderBuilder.lifecycleManager(this);
85          }
86  
87          // Build the encoder.
88          encoder = checkNotNull(encoderBuilder.build(), "encoder");
89  
90          // Build the decoder.
91          decoderBuilder.encoder(encoder);
92          decoder = checkNotNull(decoderBuilder.build(), "decoder");
93  
94          // Verify that the encoder and decoder use the same connection.
95          checkNotNull(encoder.connection(), "encoder.connection");
96          if (encoder.connection() != decoder.connection()) {
97              throw new IllegalArgumentException("Encoder and Decoder do not share the same connection object");
98          }
99  
100         clientPrefaceString = clientPrefaceString(encoder.connection());
101     }
102 
103     public Http2Connection connection() {
104         return encoder.connection();
105     }
106 
107     public Http2ConnectionDecoder decoder() {
108         return decoder;
109     }
110 
111     public Http2ConnectionEncoder encoder() {
112         return encoder;
113     }
114 
115     /**
116      * Handles the client-side (cleartext) upgrade from HTTP to HTTP/2.
117      * Reserves local stream 1 for the HTTP/2 response.
118      */
119     public void onHttpClientUpgrade() throws Http2Exception {
120         if (connection().isServer()) {
121             throw connectionError(PROTOCOL_ERROR, "Client-side HTTP upgrade requested for a server");
122         }
123         if (prefaceSent || decoder.prefaceReceived()) {
124             throw connectionError(PROTOCOL_ERROR, "HTTP upgrade must occur before HTTP/2 preface is sent or received");
125         }
126 
127         // Create a local stream used for the HTTP cleartext upgrade.
128         connection().createLocalStream(HTTP_UPGRADE_STREAM_ID).open(true);
129     }
130 
131     /**
132      * Handles the server-side (cleartext) upgrade from HTTP to HTTP/2.
133      * @param settings the settings for the remote endpoint.
134      */
135     public void onHttpServerUpgrade(Http2Settings settings) throws Http2Exception {
136         if (!connection().isServer()) {
137             throw connectionError(PROTOCOL_ERROR, "Server-side HTTP upgrade requested for a client");
138         }
139         if (prefaceSent || decoder.prefaceReceived()) {
140             throw connectionError(PROTOCOL_ERROR, "HTTP upgrade must occur before HTTP/2 preface is sent or received");
141         }
142 
143         // Apply the settings but no ACK is necessary.
144         encoder.remoteSettings(settings);
145 
146         // Create a stream in the half-closed state.
147         connection().createRemoteStream(HTTP_UPGRADE_STREAM_ID).open(true);
148     }
149 
150     @Override
151     public void channelActive(ChannelHandlerContext ctx) throws Exception {
152         // The channel just became active - send the connection preface to the remote endpoint.
153         sendPreface(ctx);
154         super.channelActive(ctx);
155     }
156 
157     @Override
158     public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
159         // This handler was just added to the context. In case it was handled after
160         // the connection became active, send the connection preface now.
161         sendPreface(ctx);
162     }
163 
164     @Override
165     protected void handlerRemoved0(ChannelHandlerContext ctx) throws Exception {
166         dispose();
167     }
168 
169     @Override
170     public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
171         // Avoid NotYetConnectedException
172         if (!ctx.channel().isActive()) {
173             ctx.close(promise);
174             return;
175         }
176 
177         ChannelFuture future = writeGoAway(ctx, null);
178 
179         // If there are no active streams, close immediately after the send is complete.
180         // Otherwise wait until all streams are inactive.
181         if (connection().numActiveStreams() == 0) {
182             future.addListener(new ClosingChannelFutureListener(ctx, promise));
183         } else {
184             closeListener = new ClosingChannelFutureListener(ctx, promise);
185         }
186     }
187 
188     @Override
189     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
190         ChannelFuture future = ctx.newSucceededFuture();
191         final Collection<Http2Stream> streams = connection().activeStreams();
192         for (Http2Stream s : streams.toArray(new Http2Stream[streams.size()])) {
193             closeStream(s, future);
194         }
195         super.channelInactive(ctx);
196     }
197 
198     /**
199      * Handles {@link Http2Exception} objects that were thrown from other handlers. Ignores all other exceptions.
200      */
201     @Override
202     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
203         if (getEmbeddedHttp2Exception(cause) != null) {
204             // Some exception in the causality chain is an Http2Exception - handle it.
205             onException(ctx, cause);
206         } else {
207             super.exceptionCaught(ctx, cause);
208         }
209     }
210 
211     /**
212      * Closes the local side of the given stream. If this causes the stream to be closed, adds a
213      * hook to close the channel after the given future completes.
214      *
215      * @param stream the stream to be half closed.
216      * @param future If closing, the future after which to close the channel.
217      */
218     @Override
219     public void closeLocalSide(Http2Stream stream, ChannelFuture future) {
220         switch (stream.state()) {
221             case HALF_CLOSED_LOCAL:
222             case OPEN:
223                 stream.closeLocalSide();
224                 break;
225             default:
226                 closeStream(stream, future);
227                 break;
228         }
229     }
230 
231     /**
232      * Closes the remote side of the given stream. If this causes the stream to be closed, adds a
233      * hook to close the channel after the given future completes.
234      *
235      * @param stream the stream to be half closed.
236      * @param future If closing, the future after which to close the channel.
237      */
238     @Override
239     public void closeRemoteSide(Http2Stream stream, ChannelFuture future) {
240         switch (stream.state()) {
241             case HALF_CLOSED_REMOTE:
242             case OPEN:
243                 stream.closeRemoteSide();
244                 break;
245             default:
246                 closeStream(stream, future);
247                 break;
248         }
249     }
250 
251     /**
252      * Closes the given stream and adds a hook to close the channel after the given future
253      * completes.
254      *
255      * @param stream the stream to be closed.
256      * @param future the future after which to close the channel.
257      */
258     @Override
259     public void closeStream(final Http2Stream stream, ChannelFuture future) {
260         stream.close();
261 
262         future.addListener(new ChannelFutureListener() {
263           @Override
264           public void operationComplete(ChannelFuture future) throws Exception {
265             // Deactivate this stream.
266             connection().deactivate(stream);
267 
268             // If this connection is closing and there are no longer any
269             // active streams, close after the current operation completes.
270             if (closeListener != null && connection().numActiveStreams() == 0) {
271                 closeListener.operationComplete(future);
272             }
273           }
274         });
275     }
276 
277     /**
278      * Central handler for all exceptions caught during HTTP/2 processing.
279      */
280     @Override
281     public void onException(ChannelHandlerContext ctx, Throwable cause) {
282         Http2Exception embedded = getEmbeddedHttp2Exception(cause);
283         if (isStreamError(embedded)) {
284             onStreamError(ctx, cause, (StreamException) embedded);
285         } else if (embedded instanceof CompositeStreamException) {
286             CompositeStreamException compositException = (CompositeStreamException) embedded;
287             for (StreamException streamException : compositException) {
288                 onStreamError(ctx, cause, streamException);
289             }
290         } else {
291             onConnectionError(ctx, cause, embedded);
292         }
293     }
294 
295     /**
296      * Handler for a connection error. Sends a GO_AWAY frame to the remote endpoint. Once all
297      * streams are closed, the connection is shut down.
298      *
299      * @param ctx the channel context
300      * @param cause the exception that was caught
301      * @param http2Ex the {@link Http2Exception} that is embedded in the causality chain. This may
302      *            be {@code null} if it's an unknown exception.
303      */
304     protected void onConnectionError(ChannelHandlerContext ctx, Throwable cause, Http2Exception http2Ex) {
305         if (http2Ex == null) {
306             http2Ex = new Http2Exception(INTERNAL_ERROR, cause.getMessage(), cause);
307         }
308         writeGoAway(ctx, http2Ex).addListener(new ClosingChannelFutureListener(ctx, ctx.newPromise()));
309     }
310 
311     /**
312      * Handler for a stream error. Sends a {@code RST_STREAM} frame to the remote endpoint and closes the
313      * stream.
314      *
315      * @param ctx the channel context
316      * @param cause the exception that was caught
317      * @param http2Ex the {@link StreamException} that is embedded in the causality chain.
318      */
319     protected void onStreamError(ChannelHandlerContext ctx, Throwable cause, StreamException http2Ex) {
320         writeRstStream(ctx, http2Ex.streamId(), http2Ex.error().code(), ctx.newPromise());
321     }
322 
323     protected Http2FrameWriter frameWriter() {
324         return encoder().frameWriter();
325     }
326 
327     /**
328      * Writes a {@code RST_STREAM} frame to the remote endpoint and updates the connection state appropriately.
329      */
330     @Override
331     public ChannelFuture writeRstStream(ChannelHandlerContext ctx, int streamId, long errorCode,
332             ChannelPromise promise) {
333         Http2Stream stream = connection().stream(streamId);
334         ChannelFuture future = frameWriter().writeRstStream(ctx, streamId, errorCode, promise);
335         ctx.flush();
336 
337         if (stream != null) {
338             stream.resetSent();
339             closeStream(stream, promise);
340         }
341 
342         return future;
343     }
344 
345     /**
346      * Sends a {@code GO_AWAY} frame to the remote endpoint and updates the connection state appropriately.
347      */
348     @Override
349     public ChannelFuture writeGoAway(ChannelHandlerContext ctx, int lastStreamId, long errorCode, ByteBuf debugData,
350             ChannelPromise promise) {
351         Http2Connection connection = connection();
352         if (connection.isGoAway()) {
353             debugData.release();
354             return ctx.newSucceededFuture();
355         }
356 
357         ChannelFuture future = frameWriter().writeGoAway(ctx, lastStreamId, errorCode, debugData, promise);
358         ctx.flush();
359 
360         connection.goAwaySent(lastStreamId);
361         return future;
362     }
363 
364     /**
365      * Sends a {@code GO_AWAY} frame appropriate for the given exception.
366      */
367     private ChannelFuture writeGoAway(ChannelHandlerContext ctx, Http2Exception cause) {
368         Http2Connection connection = connection();
369         if (connection.isGoAway()) {
370             return ctx.newSucceededFuture();
371         }
372 
373         // The connection isn't alredy going away, send the GO_AWAY frame now to start
374         // the process.
375         long errorCode = cause != null ? cause.error().code() : NO_ERROR.code();
376         ByteBuf debugData = Http2CodecUtil.toByteBuf(ctx, cause);
377         int lastKnownStream = connection.remote().lastStreamCreated();
378         return writeGoAway(ctx, lastKnownStream, errorCode, debugData, ctx.newPromise());
379     }
380 
381     @Override
382     protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
383         try {
384             // Read the remaining of the client preface string if we haven't already.
385             // If this is a client endpoint, always returns true.
386             if (!readClientPrefaceString(in)) {
387                 // Still processing the client preface.
388                 return;
389             }
390 
391             decoder.decodeFrame(ctx, in, out);
392         } catch (Throwable e) {
393             onException(ctx, e);
394         }
395     }
396 
397     /**
398      * Sends the HTTP/2 connection preface upon establishment of the connection, if not already sent.
399      */
400     private void sendPreface(final ChannelHandlerContext ctx) {
401         if (prefaceSent || !ctx.channel().isActive()) {
402             return;
403         }
404 
405         prefaceSent = true;
406 
407         if (!connection().isServer()) {
408             // Clients must send the preface string as the first bytes on the connection.
409             ctx.write(connectionPrefaceBuf()).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
410         }
411 
412         // Both client and server must send their initial settings.
413         encoder.writeSettings(ctx, decoder.localSettings(), ctx.newPromise()).addListener(
414                 ChannelFutureListener.CLOSE_ON_FAILURE);
415     }
416 
417     /**
418      * Disposes of all resources.
419      */
420     private void dispose() {
421         encoder.close();
422         decoder.close();
423         if (clientPrefaceString != null) {
424             clientPrefaceString.release();
425             clientPrefaceString = null;
426         }
427     }
428 
429     /**
430      * Decodes the client connection preface string from the input buffer.
431      *
432      * @return {@code true} if processing of the client preface string is complete. Since client preface strings can
433      *         only be received by servers, returns true immediately for client endpoints.
434      */
435     private boolean readClientPrefaceString(ByteBuf in) throws Http2Exception {
436         if (clientPrefaceString == null) {
437             return true;
438         }
439 
440         int prefaceRemaining = clientPrefaceString.readableBytes();
441         int bytesRead = Math.min(in.readableBytes(), prefaceRemaining);
442 
443         // Read the portion of the input up to the length of the preface, if reached.
444         ByteBuf sourceSlice = in.readSlice(bytesRead);
445 
446         // Read the same number of bytes from the preface buffer.
447         ByteBuf prefaceSlice = clientPrefaceString.readSlice(bytesRead);
448 
449         // If the input so far doesn't match the preface, break the connection.
450         if (bytesRead == 0 || !prefaceSlice.equals(sourceSlice)) {
451             throw connectionError(PROTOCOL_ERROR, "HTTP/2 client preface string missing or corrupt.");
452         }
453 
454         if (!clientPrefaceString.isReadable()) {
455             // Entire preface has been read.
456             clientPrefaceString.release();
457             clientPrefaceString = null;
458             return true;
459         }
460         return false;
461     }
462 
463     /**
464      * Returns the client preface string if this is a client connection, otherwise returns {@code null}.
465      */
466     private static ByteBuf clientPrefaceString(Http2Connection connection) {
467         return connection.isServer() ? connectionPrefaceBuf() : null;
468     }
469 
470     /**
471      * Closes the channel when the future completes.
472      */
473     private static final class ClosingChannelFutureListener implements ChannelFutureListener {
474         private final ChannelHandlerContext ctx;
475         private final ChannelPromise promise;
476 
477         ClosingChannelFutureListener(ChannelHandlerContext ctx, ChannelPromise promise) {
478             this.ctx = ctx;
479             this.promise = promise;
480         }
481 
482         @Override
483         public void operationComplete(ChannelFuture sentGoAwayFuture) throws Exception {
484             ctx.close(promise);
485         }
486     }
487 }