View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License, version 2.0 (the
5    * "License"); you may not use this file except in compliance with the License. You may obtain a
6    * copy of the License at:
7    *
8    * http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software distributed under the License
11   * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12   * or implied. See the License for the specific language governing permissions and limitations under
13   * the License.
14   */
15  package io.netty.handler.codec.http2;
16  
17  import io.netty.buffer.ByteBuf;
18  import io.netty.buffer.ByteBufUtil;
19  import io.netty.buffer.Unpooled;
20  import io.netty.channel.ChannelFuture;
21  import io.netty.channel.ChannelFutureListener;
22  import io.netty.channel.ChannelHandlerContext;
23  import io.netty.channel.ChannelOutboundHandler;
24  import io.netty.channel.ChannelPromise;
25  import io.netty.handler.codec.ByteToMessageDecoder;
26  import io.netty.handler.codec.http.HttpResponseStatus;
27  import io.netty.handler.codec.http2.Http2Exception.CompositeStreamException;
28  import io.netty.handler.codec.http2.Http2Exception.StreamException;
29  import io.netty.util.CharsetUtil;
30  import io.netty.util.concurrent.ScheduledFuture;
31  import io.netty.util.internal.UnstableApi;
32  import io.netty.util.internal.logging.InternalLogger;
33  import io.netty.util.internal.logging.InternalLoggerFactory;
34  
35  import java.net.SocketAddress;
36  import java.util.List;
37  import java.util.concurrent.TimeUnit;
38  
39  import static io.netty.buffer.ByteBufUtil.hexDump;
40  import static io.netty.buffer.Unpooled.EMPTY_BUFFER;
41  import static io.netty.handler.codec.http2.Http2CodecUtil.HTTP_UPGRADE_STREAM_ID;
42  import static io.netty.handler.codec.http2.Http2CodecUtil.connectionPrefaceBuf;
43  import static io.netty.handler.codec.http2.Http2CodecUtil.getEmbeddedHttp2Exception;
44  import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
45  import static io.netty.handler.codec.http2.Http2Error.NO_ERROR;
46  import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
47  import static io.netty.handler.codec.http2.Http2Exception.connectionError;
48  import static io.netty.handler.codec.http2.Http2Exception.isStreamError;
49  import static io.netty.handler.codec.http2.Http2FrameTypes.SETTINGS;
50  import static io.netty.handler.codec.http2.Http2Stream.State.IDLE;
51  import static io.netty.util.CharsetUtil.UTF_8;
52  import static io.netty.util.internal.ObjectUtil.checkNotNull;
53  import static java.lang.Math.min;
54  import static java.util.concurrent.TimeUnit.MILLISECONDS;
55  
56  /**
57   * Provides the default implementation for processing inbound frame events and delegates to a
58   * {@link Http2FrameListener}
59   * <p>
60   * This class will read HTTP/2 frames and delegate the events to a {@link Http2FrameListener}
61   * <p>
62   * This interface enforces inbound flow control functionality through
63   * {@link Http2LocalFlowController}
64   */
65  @UnstableApi
66  public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http2LifecycleManager,
67                                                                              ChannelOutboundHandler {
68  
69      private static final InternalLogger logger = InternalLoggerFactory.getInstance(Http2ConnectionHandler.class);
70  
71      private static final Http2Headers HEADERS_TOO_LARGE_HEADERS = ReadOnlyHttp2Headers.serverHeaders(false,
72              HttpResponseStatus.REQUEST_HEADER_FIELDS_TOO_LARGE.codeAsText());
73      private static final ByteBuf HTTP_1_X_BUF = Unpooled.unreleasableBuffer(
74          Unpooled.wrappedBuffer(new byte[] {'H', 'T', 'T', 'P', '/', '1', '.'})).asReadOnly();
75  
76      private final Http2ConnectionDecoder decoder;
77      private final Http2ConnectionEncoder encoder;
78      private final Http2Settings initialSettings;
79      private ChannelFutureListener closeListener;
80      private BaseDecoder byteDecoder;
81      private long gracefulShutdownTimeoutMillis;
82  
83      protected Http2ConnectionHandler(Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder,
84                                       Http2Settings initialSettings) {
85          this.initialSettings = checkNotNull(initialSettings, "initialSettings");
86          this.decoder = checkNotNull(decoder, "decoder");
87          this.encoder = checkNotNull(encoder, "encoder");
88          if (encoder.connection() != decoder.connection()) {
89              throw new IllegalArgumentException("Encoder and Decoder do not share the same connection object");
90          }
91      }
92  
93      Http2ConnectionHandler(boolean server, Http2FrameWriter frameWriter, Http2FrameLogger frameLogger,
94                      Http2Settings initialSettings) {
95          this.initialSettings = checkNotNull(initialSettings, "initialSettings");
96  
97          Http2Connection connection = new DefaultHttp2Connection(server);
98  
99          Long maxHeaderListSize = initialSettings.maxHeaderListSize();
100         Http2FrameReader frameReader = new DefaultHttp2FrameReader(maxHeaderListSize == null ?
101                 new DefaultHttp2HeadersDecoder(true) :
102                 new DefaultHttp2HeadersDecoder(true, maxHeaderListSize));
103 
104         if (frameLogger != null) {
105             frameWriter = new Http2OutboundFrameLogger(frameWriter, frameLogger);
106             frameReader = new Http2InboundFrameLogger(frameReader, frameLogger);
107         }
108         encoder = new DefaultHttp2ConnectionEncoder(connection, frameWriter);
109         decoder = new DefaultHttp2ConnectionDecoder(connection, encoder, frameReader);
110     }
111 
112     /**
113      * Get the amount of time (in milliseconds) this endpoint will wait for all streams to be closed before closing
114      * the connection during the graceful shutdown process. Returns -1 if this connection is configured to wait
115      * indefinitely for all streams to close.
116      */
117     public long gracefulShutdownTimeoutMillis() {
118         return gracefulShutdownTimeoutMillis;
119     }
120 
121     /**
122      * Set the amount of time (in milliseconds) this endpoint will wait for all streams to be closed before closing
123      * the connection during the graceful shutdown process.
124      * @param gracefulShutdownTimeoutMillis the amount of time (in milliseconds) this endpoint will wait for all
125      * streams to be closed before closing the connection during the graceful shutdown process.
126      */
127     public void gracefulShutdownTimeoutMillis(long gracefulShutdownTimeoutMillis) {
128         if (gracefulShutdownTimeoutMillis < -1) {
129             throw new IllegalArgumentException("gracefulShutdownTimeoutMillis: " + gracefulShutdownTimeoutMillis +
130                                                " (expected: -1 for indefinite or >= 0)");
131         }
132         this.gracefulShutdownTimeoutMillis = gracefulShutdownTimeoutMillis;
133     }
134 
135     public Http2Connection connection() {
136         return encoder.connection();
137     }
138 
139     public Http2ConnectionDecoder decoder() {
140         return decoder;
141     }
142 
143     public Http2ConnectionEncoder encoder() {
144         return encoder;
145     }
146 
147     private boolean prefaceSent() {
148         return byteDecoder != null && byteDecoder.prefaceSent();
149     }
150 
151     /**
152      * Handles the client-side (cleartext) upgrade from HTTP to HTTP/2.
153      * Reserves local stream 1 for the HTTP/2 response.
154      */
155     public void onHttpClientUpgrade() throws Http2Exception {
156         if (connection().isServer()) {
157             throw connectionError(PROTOCOL_ERROR, "Client-side HTTP upgrade requested for a server");
158         }
159         if (!prefaceSent()) {
160             // If the preface was not sent yet it most likely means the handler was not added to the pipeline before
161             // calling this method.
162             throw connectionError(INTERNAL_ERROR, "HTTP upgrade must occur after preface was sent");
163         }
164         if (decoder.prefaceReceived()) {
165             throw connectionError(PROTOCOL_ERROR, "HTTP upgrade must occur before HTTP/2 preface is received");
166         }
167 
168         // Create a local stream used for the HTTP cleartext upgrade.
169         connection().local().createStream(HTTP_UPGRADE_STREAM_ID, true);
170     }
171 
172     /**
173      * Handles the server-side (cleartext) upgrade from HTTP to HTTP/2.
174      * @param settings the settings for the remote endpoint.
175      */
176     public void onHttpServerUpgrade(Http2Settings settings) throws Http2Exception {
177         if (!connection().isServer()) {
178             throw connectionError(PROTOCOL_ERROR, "Server-side HTTP upgrade requested for a client");
179         }
180         if (!prefaceSent()) {
181             // If the preface was not sent yet it most likely means the handler was not added to the pipeline before
182             // calling this method.
183             throw connectionError(INTERNAL_ERROR, "HTTP upgrade must occur after preface was sent");
184         }
185         if (decoder.prefaceReceived()) {
186             throw connectionError(PROTOCOL_ERROR, "HTTP upgrade must occur before HTTP/2 preface is received");
187         }
188 
189         // Apply the settings but no ACK is necessary.
190         encoder.remoteSettings(settings);
191 
192         // Create a stream in the half-closed state.
193         connection().remote().createStream(HTTP_UPGRADE_STREAM_ID, true);
194     }
195 
196     @Override
197     public void flush(ChannelHandlerContext ctx) {
198         try {
199             // Trigger pending writes in the remote flow controller.
200             encoder.flowController().writePendingBytes();
201             ctx.flush();
202         } catch (Http2Exception e) {
203             onError(ctx, e);
204         } catch (Throwable cause) {
205             onError(ctx, connectionError(INTERNAL_ERROR, cause, "Error flushing"));
206         }
207     }
208 
209     private abstract class BaseDecoder {
210         public abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;
211         public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { }
212         public void channelActive(ChannelHandlerContext ctx) throws Exception { }
213 
214         public void channelInactive(ChannelHandlerContext ctx) throws Exception {
215             // Connection has terminated, close the encoder and decoder.
216             encoder().close();
217             decoder().close();
218 
219             // We need to remove all streams (not just the active ones).
220             // See https://github.com/netty/netty/issues/4838.
221             connection().close(ctx.voidPromise());
222         }
223 
224         /**
225          * Determine if the HTTP/2 connection preface been sent.
226          */
227         public boolean prefaceSent() {
228             return true;
229         }
230     }
231 
232     private final class PrefaceDecoder extends BaseDecoder {
233         private ByteBuf clientPrefaceString;
234         private boolean prefaceSent;
235 
236         public PrefaceDecoder(ChannelHandlerContext ctx) throws Exception {
237             clientPrefaceString = clientPrefaceString(encoder.connection());
238             // This handler was just added to the context. In case it was handled after
239             // the connection became active, send the connection preface now.
240             sendPreface(ctx);
241         }
242 
243         @Override
244         public boolean prefaceSent() {
245             return prefaceSent;
246         }
247 
248         @Override
249         public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
250             try {
251                 if (ctx.channel().isActive() && readClientPrefaceString(in) && verifyFirstFrameIsSettings(in)) {
252                     // After the preface is read, it is time to hand over control to the post initialized decoder.
253                     byteDecoder = new FrameDecoder();
254                     byteDecoder.decode(ctx, in, out);
255                 }
256             } catch (Throwable e) {
257                 onError(ctx, e);
258             }
259         }
260 
261         @Override
262         public void channelActive(ChannelHandlerContext ctx) throws Exception {
263             // The channel just became active - send the connection preface to the remote endpoint.
264             sendPreface(ctx);
265         }
266 
267         @Override
268         public void channelInactive(ChannelHandlerContext ctx) throws Exception {
269             cleanup();
270             super.channelInactive(ctx);
271         }
272 
273         /**
274          * Releases the {@code clientPrefaceString}. Any active streams will be left in the open.
275          */
276         @Override
277         public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
278             cleanup();
279         }
280 
281         /**
282          * Releases the {@code clientPrefaceString}. Any active streams will be left in the open.
283          */
284         private void cleanup() {
285             if (clientPrefaceString != null) {
286                 clientPrefaceString.release();
287                 clientPrefaceString = null;
288             }
289         }
290 
291         /**
292          * Decodes the client connection preface string from the input buffer.
293          *
294          * @return {@code true} if processing of the client preface string is complete. Since client preface strings can
295          *         only be received by servers, returns true immediately for client endpoints.
296          */
297         private boolean readClientPrefaceString(ByteBuf in) throws Http2Exception {
298             if (clientPrefaceString == null) {
299                 return true;
300             }
301 
302             int prefaceRemaining = clientPrefaceString.readableBytes();
303             int bytesRead = min(in.readableBytes(), prefaceRemaining);
304 
305             // If the input so far doesn't match the preface, break the connection.
306             if (bytesRead == 0 || !ByteBufUtil.equals(in, in.readerIndex(),
307                                                       clientPrefaceString, clientPrefaceString.readerIndex(),
308                                                       bytesRead)) {
309                 int maxSearch = 1024; // picked because 512 is too little, and 2048 too much
310                 int http1Index =
311                     ByteBufUtil.indexOf(HTTP_1_X_BUF, in.slice(in.readerIndex(), min(in.readableBytes(), maxSearch)));
312                 if (http1Index != -1) {
313                     String chunk = in.toString(in.readerIndex(), http1Index - in.readerIndex(), CharsetUtil.US_ASCII);
314                     throw connectionError(PROTOCOL_ERROR, "Unexpected HTTP/1.x request: %s", chunk);
315                 }
316                 String receivedBytes = hexDump(in, in.readerIndex(),
317                                                min(in.readableBytes(), clientPrefaceString.readableBytes()));
318                 throw connectionError(PROTOCOL_ERROR, "HTTP/2 client preface string missing or corrupt. " +
319                                                       "Hex dump for received bytes: %s", receivedBytes);
320             }
321             in.skipBytes(bytesRead);
322             clientPrefaceString.skipBytes(bytesRead);
323 
324             if (!clientPrefaceString.isReadable()) {
325                 // Entire preface has been read.
326                 clientPrefaceString.release();
327                 clientPrefaceString = null;
328                 return true;
329             }
330             return false;
331         }
332 
333         /**
334          * Peeks at that the next frame in the buffer and verifies that it is a non-ack {@code SETTINGS} frame.
335          *
336          * @param in the inbound buffer.
337          * @return {@code} true if the next frame is a non-ack {@code SETTINGS} frame, {@code false} if more
338          * data is required before we can determine the next frame type.
339          * @throws Http2Exception thrown if the next frame is NOT a non-ack {@code SETTINGS} frame.
340          */
341         private boolean verifyFirstFrameIsSettings(ByteBuf in) throws Http2Exception {
342             if (in.readableBytes() < 5) {
343                 // Need more data before we can see the frame type for the first frame.
344                 return false;
345             }
346 
347             short frameType = in.getUnsignedByte(in.readerIndex() + 3);
348             short flags = in.getUnsignedByte(in.readerIndex() + 4);
349             if (frameType != SETTINGS || (flags & Http2Flags.ACK) != 0) {
350                 throw connectionError(PROTOCOL_ERROR, "First received frame was not SETTINGS. " +
351                                                       "Hex dump for first 5 bytes: %s",
352                                       hexDump(in, in.readerIndex(), 5));
353             }
354             return true;
355         }
356 
357         /**
358          * Sends the HTTP/2 connection preface upon establishment of the connection, if not already sent.
359          */
360         private void sendPreface(ChannelHandlerContext ctx) throws Exception {
361             if (prefaceSent || !ctx.channel().isActive()) {
362                 return;
363             }
364 
365             prefaceSent = true;
366 
367             final boolean isClient = !connection().isServer();
368             if (isClient) {
369                 // Clients must send the preface string as the first bytes on the connection.
370                 ctx.write(connectionPrefaceBuf()).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
371             }
372 
373             // Both client and server must send their initial settings.
374             encoder.writeSettings(ctx, initialSettings, ctx.newPromise()).addListener(
375                     ChannelFutureListener.CLOSE_ON_FAILURE);
376 
377             if (isClient) {
378                 // If this handler is extended by the user and we directly fire the userEvent from this context then
379                 // the user will not see the event. We should fire the event starting with this handler so this class
380                 // (and extending classes) have a chance to process the event.
381                 userEventTriggered(ctx, Http2ConnectionPrefaceAndSettingsFrameWrittenEvent.INSTANCE);
382             }
383         }
384     }
385 
386     private final class FrameDecoder extends BaseDecoder {
387         @Override
388         public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
389             try {
390                 decoder.decodeFrame(ctx, in, out);
391             } catch (Throwable e) {
392                 onError(ctx, e);
393             }
394         }
395     }
396 
397     @Override
398     public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
399         // Initialize the encoder, decoder, flow controllers, and internal state.
400         encoder.lifecycleManager(this);
401         decoder.lifecycleManager(this);
402         encoder.flowController().channelHandlerContext(ctx);
403         decoder.flowController().channelHandlerContext(ctx);
404         byteDecoder = new PrefaceDecoder(ctx);
405     }
406 
407     @Override
408     protected void handlerRemoved0(ChannelHandlerContext ctx) throws Exception {
409         if (byteDecoder != null) {
410             byteDecoder.handlerRemoved(ctx);
411             byteDecoder = null;
412         }
413     }
414 
415     @Override
416     public void channelActive(ChannelHandlerContext ctx) throws Exception {
417         if (byteDecoder == null) {
418             byteDecoder = new PrefaceDecoder(ctx);
419         }
420         byteDecoder.channelActive(ctx);
421         super.channelActive(ctx);
422     }
423 
424     @Override
425     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
426         // Call super class first, as this may result in decode being called.
427         super.channelInactive(ctx);
428         if (byteDecoder != null) {
429             byteDecoder.channelInactive(ctx);
430             byteDecoder = null;
431         }
432     }
433 
434     @Override
435     public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
436         // Writability is expected to change while we are writing. We cannot allow this event to trigger reentering
437         // the allocation and write loop. Reentering the event loop will lead to over or illegal allocation.
438         try {
439             if (ctx.channel().isWritable()) {
440                 flush(ctx);
441             }
442             encoder.flowController().channelWritabilityChanged();
443         } finally {
444             super.channelWritabilityChanged(ctx);
445         }
446     }
447 
448     @Override
449     protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
450         byteDecoder.decode(ctx, in, out);
451     }
452 
453     @Override
454     public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
455         ctx.bind(localAddress, promise);
456     }
457 
458     @Override
459     public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress,
460                         ChannelPromise promise) throws Exception {
461         ctx.connect(remoteAddress, localAddress, promise);
462     }
463 
464     @Override
465     public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
466         ctx.disconnect(promise);
467     }
468 
469     @Override
470     public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
471         promise = promise.unvoid();
472         // Avoid NotYetConnectedException
473         if (!ctx.channel().isActive()) {
474             ctx.close(promise);
475             return;
476         }
477 
478         // If the user has already sent a GO_AWAY frame they may be attempting to do a graceful shutdown which requires
479         // sending multiple GO_AWAY frames. We should only send a GO_AWAY here if one has not already been sent. If
480         // a GO_AWAY has been sent we send a empty buffer just so we can wait to close until all other data has been
481         // flushed to the OS.
482         // https://github.com/netty/netty/issues/5307
483         final ChannelFuture future = connection().goAwaySent() ? ctx.write(EMPTY_BUFFER) : goAway(ctx, null);
484         ctx.flush();
485         doGracefulShutdown(ctx, future, promise);
486     }
487 
488     private void doGracefulShutdown(ChannelHandlerContext ctx, ChannelFuture future, ChannelPromise promise) {
489         if (isGracefulShutdownComplete()) {
490             // If there are no active streams, close immediately after the GO_AWAY write completes.
491             future.addListener(new ClosingChannelFutureListener(ctx, promise));
492         } else {
493             // If there are active streams we should wait until they are all closed before closing the connection.
494             if (gracefulShutdownTimeoutMillis < 0) {
495                 closeListener = new ClosingChannelFutureListener(ctx, promise);
496             } else {
497                 closeListener = new ClosingChannelFutureListener(ctx, promise,
498                                                                  gracefulShutdownTimeoutMillis, MILLISECONDS);
499             }
500         }
501     }
502 
503     @Override
504     public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
505         ctx.deregister(promise);
506     }
507 
508     @Override
509     public void read(ChannelHandlerContext ctx) throws Exception {
510         ctx.read();
511     }
512 
513     @Override
514     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
515         ctx.write(msg, promise);
516     }
517 
518     @Override
519     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
520         // Trigger flush after read on the assumption that flush is cheap if there is nothing to write and that
521         // for flow-control the read may release window that causes data to be written that can now be flushed.
522         try {
523             // First call channelReadComplete0(...) as this may produce more data that we want to flush
524             channelReadComplete0(ctx);
525         } finally {
526             flush(ctx);
527         }
528     }
529 
530     void channelReadComplete0(ChannelHandlerContext ctx) throws Exception {
531         super.channelReadComplete(ctx);
532     }
533 
534     /**
535      * Handles {@link Http2Exception} objects that were thrown from other handlers. Ignores all other exceptions.
536      */
537     @Override
538     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
539         if (getEmbeddedHttp2Exception(cause) != null) {
540             // Some exception in the causality chain is an Http2Exception - handle it.
541             onError(ctx, cause);
542         } else {
543             super.exceptionCaught(ctx, cause);
544         }
545     }
546 
547     /**
548      * Closes the local side of the given stream. If this causes the stream to be closed, adds a
549      * hook to close the channel after the given future completes.
550      *
551      * @param stream the stream to be half closed.
552      * @param future If closing, the future after which to close the channel.
553      */
554     @Override
555     public void closeStreamLocal(Http2Stream stream, ChannelFuture future) {
556         switch (stream.state()) {
557             case HALF_CLOSED_LOCAL:
558             case OPEN:
559                 stream.closeLocalSide();
560                 break;
561             default:
562                 closeStream(stream, future);
563                 break;
564         }
565     }
566 
567     /**
568      * Closes the remote side of the given stream. If this causes the stream to be closed, adds a
569      * hook to close the channel after the given future completes.
570      *
571      * @param stream the stream to be half closed.
572      * @param future If closing, the future after which to close the channel.
573      */
574     @Override
575     public void closeStreamRemote(Http2Stream stream, ChannelFuture future) {
576         switch (stream.state()) {
577             case HALF_CLOSED_REMOTE:
578             case OPEN:
579                 stream.closeRemoteSide();
580                 break;
581             default:
582                 closeStream(stream, future);
583                 break;
584         }
585     }
586 
587     @Override
588     public void closeStream(final Http2Stream stream, ChannelFuture future) {
589         stream.close();
590 
591         if (future.isDone()) {
592             checkCloseConnection(future);
593         } else {
594             future.addListener(new ChannelFutureListener() {
595                 @Override
596                 public void operationComplete(ChannelFuture future) throws Exception {
597                     checkCloseConnection(future);
598                 }
599             });
600         }
601     }
602 
603     /**
604      * Central handler for all exceptions caught during HTTP/2 processing.
605      */
606     @Override
607     public void onError(ChannelHandlerContext ctx, Throwable cause) {
608         Http2Exception embedded = getEmbeddedHttp2Exception(cause);
609         if (isStreamError(embedded)) {
610             onStreamError(ctx, cause, (StreamException) embedded);
611         } else if (embedded instanceof CompositeStreamException) {
612             CompositeStreamException compositException = (CompositeStreamException) embedded;
613             for (StreamException streamException : compositException) {
614                 onStreamError(ctx, cause, streamException);
615             }
616         } else {
617             onConnectionError(ctx, cause, embedded);
618         }
619         ctx.flush();
620     }
621 
622     /**
623      * Called by the graceful shutdown logic to determine when it is safe to close the connection. Returns {@code true}
624      * if the graceful shutdown has completed and the connection can be safely closed. This implementation just
625      * guarantees that there are no active streams. Subclasses may override to provide additional checks.
626      */
627     protected boolean isGracefulShutdownComplete() {
628         return connection().numActiveStreams() == 0;
629     }
630 
631     /**
632      * Handler for a connection error. Sends a GO_AWAY frame to the remote endpoint. Once all
633      * streams are closed, the connection is shut down.
634      *
635      * @param ctx the channel context
636      * @param cause the exception that was caught
637      * @param http2Ex the {@link Http2Exception} that is embedded in the causality chain. This may
638      *            be {@code null} if it's an unknown exception.
639      */
640     protected void onConnectionError(ChannelHandlerContext ctx, Throwable cause, Http2Exception http2Ex) {
641         if (http2Ex == null) {
642             http2Ex = new Http2Exception(INTERNAL_ERROR, cause.getMessage(), cause);
643         }
644 
645         ChannelPromise promise = ctx.newPromise();
646         ChannelFuture future = goAway(ctx, http2Ex);
647         switch (http2Ex.shutdownHint()) {
648         case GRACEFUL_SHUTDOWN:
649             doGracefulShutdown(ctx, future, promise);
650             break;
651         default:
652             future.addListener(new ClosingChannelFutureListener(ctx, promise));
653             break;
654         }
655     }
656 
657     /**
658      * Handler for a stream error. Sends a {@code RST_STREAM} frame to the remote endpoint and closes the
659      * stream.
660      *
661      * @param ctx the channel context
662      * @param cause the exception that was caught
663      * @param http2Ex the {@link StreamException} that is embedded in the causality chain.
664      */
665     protected void onStreamError(ChannelHandlerContext ctx, @SuppressWarnings("unused") Throwable cause,
666                                  StreamException http2Ex) {
667         final int streamId = http2Ex.streamId();
668         Http2Stream stream = connection().stream(streamId);
669 
670         //if this is caused by reading headers that are too large, send a header with status 431
671         if (http2Ex instanceof Http2Exception.HeaderListSizeException &&
672             ((Http2Exception.HeaderListSizeException) http2Ex).duringDecode() &&
673             connection().isServer()) {
674 
675             // NOTE We have to check to make sure that a stream exists before we send our reply.
676             // We likely always create the stream below as the stream isn't created until the
677             // header block is completely processed.
678 
679             // The case of a streamId referring to a stream which was already closed is handled
680             // by createStream and will land us in the catch block below
681             if (stream == null) {
682                 try {
683                     stream = encoder.connection().remote().createStream(streamId, true);
684                 } catch (Http2Exception e) {
685                     resetUnknownStream(ctx, streamId, http2Ex.error().code(), ctx.newPromise());
686                     return;
687                 }
688             }
689 
690             // ensure that we have not already sent headers on this stream
691             if (stream != null && !stream.isHeadersSent()) {
692                 try {
693                     handleServerHeaderDecodeSizeError(ctx, stream);
694                 } catch (Throwable cause2) {
695                     onError(ctx, connectionError(INTERNAL_ERROR, cause2, "Error DecodeSizeError"));
696                 }
697             }
698         }
699 
700         if (stream == null) {
701             resetUnknownStream(ctx, streamId, http2Ex.error().code(), ctx.newPromise());
702         } else {
703             resetStream(ctx, stream, http2Ex.error().code(), ctx.newPromise());
704         }
705     }
706 
707     /**
708      * Notifies client that this server has received headers that are larger than what it is
709      * willing to accept. Override to change behavior.
710      *
711      * @param ctx the channel context
712      * @param stream the Http2Stream on which the header was received
713      */
714     protected void handleServerHeaderDecodeSizeError(ChannelHandlerContext ctx, Http2Stream stream) {
715         encoder().writeHeaders(ctx, stream.id(), HEADERS_TOO_LARGE_HEADERS, 0, true, ctx.newPromise());
716     }
717 
718     protected Http2FrameWriter frameWriter() {
719         return encoder().frameWriter();
720     }
721 
722     /**
723      * Sends a {@code RST_STREAM} frame even if we don't know about the stream. This error condition is most likely
724      * triggered by the first frame of a stream being invalid. That is, there was an error reading the frame before
725      * we could create a new stream.
726      */
727     private ChannelFuture resetUnknownStream(final ChannelHandlerContext ctx, int streamId, long errorCode,
728                                              ChannelPromise promise) {
729         ChannelFuture future = frameWriter().writeRstStream(ctx, streamId, errorCode, promise);
730         if (future.isDone()) {
731             closeConnectionOnError(ctx, future);
732         } else {
733             future.addListener(new ChannelFutureListener() {
734                 @Override
735                 public void operationComplete(ChannelFuture future) throws Exception {
736                     closeConnectionOnError(ctx, future);
737                 }
738             });
739         }
740         return future;
741     }
742 
743     @Override
744     public ChannelFuture resetStream(final ChannelHandlerContext ctx, int streamId, long errorCode,
745                                      ChannelPromise promise) {
746         final Http2Stream stream = connection().stream(streamId);
747         if (stream == null) {
748             return resetUnknownStream(ctx, streamId, errorCode, promise.unvoid());
749         }
750 
751        return resetStream(ctx, stream, errorCode, promise);
752     }
753 
754     private ChannelFuture resetStream(final ChannelHandlerContext ctx, final Http2Stream stream,
755                                       long errorCode, ChannelPromise promise) {
756         promise = promise.unvoid();
757         if (stream.isResetSent()) {
758             // Don't write a RST_STREAM frame if we have already written one.
759             return promise.setSuccess();
760         }
761         final ChannelFuture future;
762         // If the remote peer is not aware of the steam, then we are not allowed to send a RST_STREAM
763         // https://tools.ietf.org/html/rfc7540#section-6.4.
764         if (stream.state() == IDLE ||
765             connection().local().created(stream) && !stream.isHeadersSent() && !stream.isPushPromiseSent()) {
766             future = promise.setSuccess();
767         } else {
768             future = frameWriter().writeRstStream(ctx, stream.id(), errorCode, promise);
769         }
770 
771         // Synchronously set the resetSent flag to prevent any subsequent calls
772         // from resulting in multiple reset frames being sent.
773         stream.resetSent();
774 
775         if (future.isDone()) {
776             processRstStreamWriteResult(ctx, stream, future);
777         } else {
778             future.addListener(new ChannelFutureListener() {
779                 @Override
780                 public void operationComplete(ChannelFuture future) throws Exception {
781                     processRstStreamWriteResult(ctx, stream, future);
782                 }
783             });
784         }
785 
786         return future;
787     }
788 
789     @Override
790     public ChannelFuture goAway(final ChannelHandlerContext ctx, final int lastStreamId, final long errorCode,
791                                 final ByteBuf debugData, ChannelPromise promise) {
792         try {
793             promise = promise.unvoid();
794             final Http2Connection connection = connection();
795             if (connection().goAwaySent()) {
796                 // Protect against re-entrancy. Could happen if writing the frame fails, and error handling
797                 // treating this is a connection handler and doing a graceful shutdown...
798                 if (lastStreamId == connection().remote().lastStreamKnownByPeer()) {
799                     // Release the data and notify the promise
800                     debugData.release();
801                     return promise.setSuccess();
802                 }
803                 if (lastStreamId > connection.remote().lastStreamKnownByPeer()) {
804                     throw connectionError(PROTOCOL_ERROR, "Last stream identifier must not increase between " +
805                                                           "sending multiple GOAWAY frames (was '%d', is '%d').",
806                                           connection.remote().lastStreamKnownByPeer(), lastStreamId);
807                 }
808             }
809 
810             connection.goAwaySent(lastStreamId, errorCode, debugData);
811 
812             // Need to retain before we write the buffer because if we do it after the refCnt could already be 0 and
813             // result in an IllegalRefCountException.
814             debugData.retain();
815             ChannelFuture future = frameWriter().writeGoAway(ctx, lastStreamId, errorCode, debugData, promise);
816 
817             if (future.isDone()) {
818                 processGoAwayWriteResult(ctx, lastStreamId, errorCode, debugData, future);
819             } else {
820                 future.addListener(new ChannelFutureListener() {
821                     @Override
822                     public void operationComplete(ChannelFuture future) throws Exception {
823                         processGoAwayWriteResult(ctx, lastStreamId, errorCode, debugData, future);
824                     }
825                 });
826             }
827 
828             return future;
829         } catch (Throwable cause) { // Make sure to catch Throwable because we are doing a retain() in this method.
830             debugData.release();
831             return promise.setFailure(cause);
832         }
833     }
834 
835     /**
836      * Closes the connection if the graceful shutdown process has completed.
837      * @param future Represents the status that will be passed to the {@link #closeListener}.
838      */
839     private void checkCloseConnection(ChannelFuture future) {
840         // If this connection is closing and the graceful shutdown has completed, close the connection
841         // once this operation completes.
842         if (closeListener != null && isGracefulShutdownComplete()) {
843             ChannelFutureListener closeListener = this.closeListener;
844             // This method could be called multiple times
845             // and we don't want to notify the closeListener multiple times.
846             this.closeListener = null;
847             try {
848                 closeListener.operationComplete(future);
849             } catch (Exception e) {
850                 throw new IllegalStateException("Close listener threw an unexpected exception", e);
851             }
852         }
853     }
854 
855     /**
856      * Close the remote endpoint with with a {@code GO_AWAY} frame. Does <strong>not</strong> flush
857      * immediately, this is the responsibility of the caller.
858      */
859     private ChannelFuture goAway(ChannelHandlerContext ctx, Http2Exception cause) {
860         long errorCode = cause != null ? cause.error().code() : NO_ERROR.code();
861         int lastKnownStream = connection().remote().lastStreamCreated();
862         return goAway(ctx, lastKnownStream, errorCode, Http2CodecUtil.toByteBuf(ctx, cause), ctx.newPromise());
863     }
864 
865     private void processRstStreamWriteResult(ChannelHandlerContext ctx, Http2Stream stream, ChannelFuture future) {
866         if (future.isSuccess()) {
867             closeStream(stream, future);
868         } else {
869             // The connection will be closed and so no need to change the resetSent flag to false.
870             onConnectionError(ctx, future.cause(), null);
871         }
872     }
873 
874     private void closeConnectionOnError(ChannelHandlerContext ctx, ChannelFuture future) {
875         if (!future.isSuccess()) {
876             onConnectionError(ctx, future.cause(), null);
877         }
878     }
879 
880     /**
881      * Returns the client preface string if this is a client connection, otherwise returns {@code null}.
882      */
883     private static ByteBuf clientPrefaceString(Http2Connection connection) {
884         return connection.isServer() ? connectionPrefaceBuf() : null;
885     }
886 
887     private static void processGoAwayWriteResult(final ChannelHandlerContext ctx, final int lastStreamId,
888                                                  final long errorCode, final ByteBuf debugData, ChannelFuture future) {
889         try {
890             if (future.isSuccess()) {
891                 if (errorCode != NO_ERROR.code()) {
892                     if (logger.isDebugEnabled()) {
893                         logger.debug("{} Sent GOAWAY: lastStreamId '{}', errorCode '{}', " +
894                                      "debugData '{}'. Forcing shutdown of the connection.",
895                                      ctx.channel(), lastStreamId, errorCode, debugData.toString(UTF_8), future.cause());
896                     }
897                     ctx.close();
898                 }
899             } else {
900                 if (logger.isDebugEnabled()) {
901                     logger.debug("{} Sending GOAWAY failed: lastStreamId '{}', errorCode '{}', " +
902                                  "debugData '{}'. Forcing shutdown of the connection.",
903                                  ctx.channel(), lastStreamId, errorCode, debugData.toString(UTF_8), future.cause());
904                 }
905                 ctx.close();
906             }
907         } finally {
908             // We're done with the debug data now.
909             debugData.release();
910         }
911     }
912 
913     /**
914      * Closes the channel when the future completes.
915      */
916     private static final class ClosingChannelFutureListener implements ChannelFutureListener {
917         private final ChannelHandlerContext ctx;
918         private final ChannelPromise promise;
919         private final ScheduledFuture<?> timeoutTask;
920 
921         ClosingChannelFutureListener(ChannelHandlerContext ctx, ChannelPromise promise) {
922             this.ctx = ctx;
923             this.promise = promise;
924             timeoutTask = null;
925         }
926 
927         ClosingChannelFutureListener(final ChannelHandlerContext ctx, final ChannelPromise promise,
928                                      long timeout, TimeUnit unit) {
929             this.ctx = ctx;
930             this.promise = promise;
931             timeoutTask = ctx.executor().schedule(new Runnable() {
932                 @Override
933                 public void run() {
934                     ctx.close(promise);
935                 }
936             }, timeout, unit);
937         }
938 
939         @Override
940         public void operationComplete(ChannelFuture sentGoAwayFuture) throws Exception {
941             if (timeoutTask != null) {
942                 timeoutTask.cancel(false);
943             }
944             ctx.close(promise);
945         }
946     }
947 }