View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License, version 2.0 (the
5    * "License"); you may not use this file except in compliance with the License. You may obtain a
6    * copy of the License at:
7    *
8    * http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software distributed under the License
11   * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12   * or implied. See the License for the specific language governing permissions and limitations under
13   * the License.
14   */
15  package io.netty.handler.codec.http2;
16  
17  import io.netty.buffer.ByteBuf;
18  import io.netty.buffer.ByteBufUtil;
19  import io.netty.buffer.Unpooled;
20  import io.netty.channel.ChannelFuture;
21  import io.netty.channel.ChannelFutureListener;
22  import io.netty.channel.ChannelHandlerContext;
23  import io.netty.channel.ChannelOutboundHandler;
24  import io.netty.channel.ChannelPromise;
25  import io.netty.handler.codec.ByteToMessageDecoder;
26  import io.netty.handler.codec.http.HttpResponseStatus;
27  import io.netty.handler.codec.http2.Http2Exception.CompositeStreamException;
28  import io.netty.handler.codec.http2.Http2Exception.StreamException;
29  import io.netty.util.CharsetUtil;
30  import io.netty.util.concurrent.ScheduledFuture;
31  import io.netty.util.internal.UnstableApi;
32  import io.netty.util.internal.logging.InternalLogger;
33  import io.netty.util.internal.logging.InternalLoggerFactory;
34  
35  import java.net.SocketAddress;
36  import java.util.List;
37  import java.util.concurrent.TimeUnit;
38  
39  import static io.netty.buffer.ByteBufUtil.hexDump;
40  import static io.netty.buffer.Unpooled.EMPTY_BUFFER;
41  import static io.netty.handler.codec.http2.Http2CodecUtil.HTTP_UPGRADE_STREAM_ID;
42  import static io.netty.handler.codec.http2.Http2CodecUtil.connectionPrefaceBuf;
43  import static io.netty.handler.codec.http2.Http2CodecUtil.getEmbeddedHttp2Exception;
44  import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
45  import static io.netty.handler.codec.http2.Http2Error.NO_ERROR;
46  import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
47  import static io.netty.handler.codec.http2.Http2Exception.connectionError;
48  import static io.netty.handler.codec.http2.Http2Exception.isStreamError;
49  import static io.netty.handler.codec.http2.Http2FrameTypes.SETTINGS;
50  import static io.netty.handler.codec.http2.Http2Stream.State.IDLE;
51  import static io.netty.util.CharsetUtil.UTF_8;
52  import static io.netty.util.internal.ObjectUtil.checkNotNull;
53  import static java.lang.Math.min;
54  import static java.util.concurrent.TimeUnit.MILLISECONDS;
55  
56  /**
57   * Provides the default implementation for processing inbound frame events and delegates to a
58   * {@link Http2FrameListener}
59   * <p>
60   * This class will read HTTP/2 frames and delegate the events to a {@link Http2FrameListener}
61   * <p>
62   * This interface enforces inbound flow control functionality through
63   * {@link Http2LocalFlowController}
64   */
65  @UnstableApi
66  public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http2LifecycleManager,
67                                                                              ChannelOutboundHandler {
68  
69      private static final InternalLogger logger = InternalLoggerFactory.getInstance(Http2ConnectionHandler.class);
70  
71      private static final Http2Headers HEADERS_TOO_LARGE_HEADERS = ReadOnlyHttp2Headers.serverHeaders(false,
72              HttpResponseStatus.REQUEST_HEADER_FIELDS_TOO_LARGE.codeAsText());
73      private static final ByteBuf HTTP_1_X_BUF = Unpooled.unreleasableBuffer(
74          Unpooled.wrappedBuffer(new byte[] {'H', 'T', 'T', 'P', '/', '1', '.'})).asReadOnly();
75  
76      private final Http2ConnectionDecoder decoder;
77      private final Http2ConnectionEncoder encoder;
78      private final Http2Settings initialSettings;
79      private ChannelFutureListener closeListener;
80      private BaseDecoder byteDecoder;
81      private long gracefulShutdownTimeoutMillis;
82  
83      protected Http2ConnectionHandler(Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder,
84                                       Http2Settings initialSettings) {
85          this.initialSettings = checkNotNull(initialSettings, "initialSettings");
86          this.decoder = checkNotNull(decoder, "decoder");
87          this.encoder = checkNotNull(encoder, "encoder");
88          if (encoder.connection() != decoder.connection()) {
89              throw new IllegalArgumentException("Encoder and Decoder do not share the same connection object");
90          }
91      }
92  
93      Http2ConnectionHandler(boolean server, Http2FrameWriter frameWriter, Http2FrameLogger frameLogger,
94                      Http2Settings initialSettings) {
95          this.initialSettings = checkNotNull(initialSettings, "initialSettings");
96  
97          Http2Connection connection = new DefaultHttp2Connection(server);
98  
99          Long maxHeaderListSize = initialSettings.maxHeaderListSize();
100         Http2FrameReader frameReader = new DefaultHttp2FrameReader(maxHeaderListSize == null ?
101                 new DefaultHttp2HeadersDecoder(true) :
102                 new DefaultHttp2HeadersDecoder(true, maxHeaderListSize));
103 
104         if (frameLogger != null) {
105             frameWriter = new Http2OutboundFrameLogger(frameWriter, frameLogger);
106             frameReader = new Http2InboundFrameLogger(frameReader, frameLogger);
107         }
108         encoder = new DefaultHttp2ConnectionEncoder(connection, frameWriter);
109         decoder = new DefaultHttp2ConnectionDecoder(connection, encoder, frameReader);
110     }
111 
112     /**
113      * Get the amount of time (in milliseconds) this endpoint will wait for all streams to be closed before closing
114      * the connection during the graceful shutdown process. Returns -1 if this connection is configured to wait
115      * indefinitely for all streams to close.
116      */
117     public long gracefulShutdownTimeoutMillis() {
118         return gracefulShutdownTimeoutMillis;
119     }
120 
121     /**
122      * Set the amount of time (in milliseconds) this endpoint will wait for all streams to be closed before closing
123      * the connection during the graceful shutdown process.
124      * @param gracefulShutdownTimeoutMillis the amount of time (in milliseconds) this endpoint will wait for all
125      * streams to be closed before closing the connection during the graceful shutdown process.
126      */
127     public void gracefulShutdownTimeoutMillis(long gracefulShutdownTimeoutMillis) {
128         if (gracefulShutdownTimeoutMillis < -1) {
129             throw new IllegalArgumentException("gracefulShutdownTimeoutMillis: " + gracefulShutdownTimeoutMillis +
130                                                " (expected: -1 for indefinite or >= 0)");
131         }
132         this.gracefulShutdownTimeoutMillis = gracefulShutdownTimeoutMillis;
133     }
134 
135     public Http2Connection connection() {
136         return encoder.connection();
137     }
138 
139     public Http2ConnectionDecoder decoder() {
140         return decoder;
141     }
142 
143     public Http2ConnectionEncoder encoder() {
144         return encoder;
145     }
146 
147     private boolean prefaceSent() {
148         return byteDecoder != null && byteDecoder.prefaceSent();
149     }
150 
151     /**
152      * Handles the client-side (cleartext) upgrade from HTTP to HTTP/2.
153      * Reserves local stream 1 for the HTTP/2 response.
154      */
155     public void onHttpClientUpgrade() throws Http2Exception {
156         if (connection().isServer()) {
157             throw connectionError(PROTOCOL_ERROR, "Client-side HTTP upgrade requested for a server");
158         }
159         if (!prefaceSent()) {
160             // If the preface was not sent yet it most likely means the handler was not added to the pipeline before
161             // calling this method.
162             throw connectionError(INTERNAL_ERROR, "HTTP upgrade must occur after preface was sent");
163         }
164         if (decoder.prefaceReceived()) {
165             throw connectionError(PROTOCOL_ERROR, "HTTP upgrade must occur before HTTP/2 preface is received");
166         }
167 
168         // Create a local stream used for the HTTP cleartext upgrade.
169         connection().local().createStream(HTTP_UPGRADE_STREAM_ID, true);
170     }
171 
172     /**
173      * Handles the server-side (cleartext) upgrade from HTTP to HTTP/2.
174      * @param settings the settings for the remote endpoint.
175      */
176     public void onHttpServerUpgrade(Http2Settings settings) throws Http2Exception {
177         if (!connection().isServer()) {
178             throw connectionError(PROTOCOL_ERROR, "Server-side HTTP upgrade requested for a client");
179         }
180         if (!prefaceSent()) {
181             // If the preface was not sent yet it most likely means the handler was not added to the pipeline before
182             // calling this method.
183             throw connectionError(INTERNAL_ERROR, "HTTP upgrade must occur after preface was sent");
184         }
185         if (decoder.prefaceReceived()) {
186             throw connectionError(PROTOCOL_ERROR, "HTTP upgrade must occur before HTTP/2 preface is received");
187         }
188 
189         // Apply the settings but no ACK is necessary.
190         encoder.remoteSettings(settings);
191 
192         // Create a stream in the half-closed state.
193         connection().remote().createStream(HTTP_UPGRADE_STREAM_ID, true);
194     }
195 
196     @Override
197     public void flush(ChannelHandlerContext ctx) {
198         try {
199             // Trigger pending writes in the remote flow controller.
200             encoder.flowController().writePendingBytes();
201             ctx.flush();
202         } catch (Http2Exception e) {
203             onError(ctx, true, e);
204         } catch (Throwable cause) {
205             onError(ctx, true, connectionError(INTERNAL_ERROR, cause, "Error flushing"));
206         }
207     }
208 
209     private abstract class BaseDecoder {
210         public abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;
211         public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { }
212         public void channelActive(ChannelHandlerContext ctx) throws Exception { }
213 
214         public void channelInactive(ChannelHandlerContext ctx) throws Exception {
215             // Connection has terminated, close the encoder and decoder.
216             encoder().close();
217             decoder().close();
218 
219             // We need to remove all streams (not just the active ones).
220             // See https://github.com/netty/netty/issues/4838.
221             connection().close(ctx.voidPromise());
222         }
223 
224         /**
225          * Determine if the HTTP/2 connection preface been sent.
226          */
227         public boolean prefaceSent() {
228             return true;
229         }
230     }
231 
232     private final class PrefaceDecoder extends BaseDecoder {
233         private ByteBuf clientPrefaceString;
234         private boolean prefaceSent;
235 
236         public PrefaceDecoder(ChannelHandlerContext ctx) throws Exception {
237             clientPrefaceString = clientPrefaceString(encoder.connection());
238             // This handler was just added to the context. In case it was handled after
239             // the connection became active, send the connection preface now.
240             sendPreface(ctx);
241         }
242 
243         @Override
244         public boolean prefaceSent() {
245             return prefaceSent;
246         }
247 
248         @Override
249         public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
250             try {
251                 if (ctx.channel().isActive() && readClientPrefaceString(in) && verifyFirstFrameIsSettings(in)) {
252                     // After the preface is read, it is time to hand over control to the post initialized decoder.
253                     byteDecoder = new FrameDecoder();
254                     byteDecoder.decode(ctx, in, out);
255                 }
256             } catch (Throwable e) {
257                 onError(ctx, false, e);
258             }
259         }
260 
261         @Override
262         public void channelActive(ChannelHandlerContext ctx) throws Exception {
263             // The channel just became active - send the connection preface to the remote endpoint.
264             sendPreface(ctx);
265         }
266 
267         @Override
268         public void channelInactive(ChannelHandlerContext ctx) throws Exception {
269             cleanup();
270             super.channelInactive(ctx);
271         }
272 
273         /**
274          * Releases the {@code clientPrefaceString}. Any active streams will be left in the open.
275          */
276         @Override
277         public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
278             cleanup();
279         }
280 
281         /**
282          * Releases the {@code clientPrefaceString}. Any active streams will be left in the open.
283          */
284         private void cleanup() {
285             if (clientPrefaceString != null) {
286                 clientPrefaceString.release();
287                 clientPrefaceString = null;
288             }
289         }
290 
291         /**
292          * Decodes the client connection preface string from the input buffer.
293          *
294          * @return {@code true} if processing of the client preface string is complete. Since client preface strings can
295          *         only be received by servers, returns true immediately for client endpoints.
296          */
297         private boolean readClientPrefaceString(ByteBuf in) throws Http2Exception {
298             if (clientPrefaceString == null) {
299                 return true;
300             }
301 
302             int prefaceRemaining = clientPrefaceString.readableBytes();
303             int bytesRead = min(in.readableBytes(), prefaceRemaining);
304 
305             // If the input so far doesn't match the preface, break the connection.
306             if (bytesRead == 0 || !ByteBufUtil.equals(in, in.readerIndex(),
307                                                       clientPrefaceString, clientPrefaceString.readerIndex(),
308                                                       bytesRead)) {
309                 int maxSearch = 1024; // picked because 512 is too little, and 2048 too much
310                 int http1Index =
311                     ByteBufUtil.indexOf(HTTP_1_X_BUF, in.slice(in.readerIndex(), min(in.readableBytes(), maxSearch)));
312                 if (http1Index != -1) {
313                     String chunk = in.toString(in.readerIndex(), http1Index - in.readerIndex(), CharsetUtil.US_ASCII);
314                     throw connectionError(PROTOCOL_ERROR, "Unexpected HTTP/1.x request: %s", chunk);
315                 }
316                 String receivedBytes = hexDump(in, in.readerIndex(),
317                                                min(in.readableBytes(), clientPrefaceString.readableBytes()));
318                 throw connectionError(PROTOCOL_ERROR, "HTTP/2 client preface string missing or corrupt. " +
319                                                       "Hex dump for received bytes: %s", receivedBytes);
320             }
321             in.skipBytes(bytesRead);
322             clientPrefaceString.skipBytes(bytesRead);
323 
324             if (!clientPrefaceString.isReadable()) {
325                 // Entire preface has been read.
326                 clientPrefaceString.release();
327                 clientPrefaceString = null;
328                 return true;
329             }
330             return false;
331         }
332 
333         /**
334          * Peeks at that the next frame in the buffer and verifies that it is a non-ack {@code SETTINGS} frame.
335          *
336          * @param in the inbound buffer.
337          * @return {@code} true if the next frame is a non-ack {@code SETTINGS} frame, {@code false} if more
338          * data is required before we can determine the next frame type.
339          * @throws Http2Exception thrown if the next frame is NOT a non-ack {@code SETTINGS} frame.
340          */
341         private boolean verifyFirstFrameIsSettings(ByteBuf in) throws Http2Exception {
342             if (in.readableBytes() < 5) {
343                 // Need more data before we can see the frame type for the first frame.
344                 return false;
345             }
346 
347             short frameType = in.getUnsignedByte(in.readerIndex() + 3);
348             short flags = in.getUnsignedByte(in.readerIndex() + 4);
349             if (frameType != SETTINGS || (flags & Http2Flags.ACK) != 0) {
350                 throw connectionError(PROTOCOL_ERROR, "First received frame was not SETTINGS. " +
351                                                       "Hex dump for first 5 bytes: %s",
352                                       hexDump(in, in.readerIndex(), 5));
353             }
354             return true;
355         }
356 
357         /**
358          * Sends the HTTP/2 connection preface upon establishment of the connection, if not already sent.
359          */
360         private void sendPreface(ChannelHandlerContext ctx) throws Exception {
361             if (prefaceSent || !ctx.channel().isActive()) {
362                 return;
363             }
364 
365             prefaceSent = true;
366 
367             final boolean isClient = !connection().isServer();
368             if (isClient) {
369                 // Clients must send the preface string as the first bytes on the connection.
370                 ctx.write(connectionPrefaceBuf()).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
371             }
372 
373             // Both client and server must send their initial settings.
374             encoder.writeSettings(ctx, initialSettings, ctx.newPromise()).addListener(
375                     ChannelFutureListener.CLOSE_ON_FAILURE);
376 
377             if (isClient) {
378                 // If this handler is extended by the user and we directly fire the userEvent from this context then
379                 // the user will not see the event. We should fire the event starting with this handler so this class
380                 // (and extending classes) have a chance to process the event.
381                 userEventTriggered(ctx, Http2ConnectionPrefaceAndSettingsFrameWrittenEvent.INSTANCE);
382             }
383         }
384     }
385 
386     private final class FrameDecoder extends BaseDecoder {
387         @Override
388         public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
389             try {
390                 decoder.decodeFrame(ctx, in, out);
391             } catch (Throwable e) {
392                 onError(ctx, false, e);
393             }
394         }
395     }
396 
397     @Override
398     public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
399         // Initialize the encoder, decoder, flow controllers, and internal state.
400         encoder.lifecycleManager(this);
401         decoder.lifecycleManager(this);
402         encoder.flowController().channelHandlerContext(ctx);
403         decoder.flowController().channelHandlerContext(ctx);
404         byteDecoder = new PrefaceDecoder(ctx);
405     }
406 
407     @Override
408     protected void handlerRemoved0(ChannelHandlerContext ctx) throws Exception {
409         if (byteDecoder != null) {
410             byteDecoder.handlerRemoved(ctx);
411             byteDecoder = null;
412         }
413     }
414 
415     @Override
416     public void channelActive(ChannelHandlerContext ctx) throws Exception {
417         if (byteDecoder == null) {
418             byteDecoder = new PrefaceDecoder(ctx);
419         }
420         byteDecoder.channelActive(ctx);
421         super.channelActive(ctx);
422     }
423 
424     @Override
425     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
426         // Call super class first, as this may result in decode being called.
427         super.channelInactive(ctx);
428         if (byteDecoder != null) {
429             byteDecoder.channelInactive(ctx);
430             byteDecoder = null;
431         }
432     }
433 
434     @Override
435     public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
436         // Writability is expected to change while we are writing. We cannot allow this event to trigger reentering
437         // the allocation and write loop. Reentering the event loop will lead to over or illegal allocation.
438         try {
439             if (ctx.channel().isWritable()) {
440                 flush(ctx);
441             }
442             encoder.flowController().channelWritabilityChanged();
443         } finally {
444             super.channelWritabilityChanged(ctx);
445         }
446     }
447 
448     @Override
449     protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
450         byteDecoder.decode(ctx, in, out);
451     }
452 
453     @Override
454     public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
455         ctx.bind(localAddress, promise);
456     }
457 
458     @Override
459     public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress,
460                         ChannelPromise promise) throws Exception {
461         ctx.connect(remoteAddress, localAddress, promise);
462     }
463 
464     @Override
465     public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
466         ctx.disconnect(promise);
467     }
468 
469     @Override
470     public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
471         promise = promise.unvoid();
472         // Avoid NotYetConnectedException
473         if (!ctx.channel().isActive()) {
474             ctx.close(promise);
475             return;
476         }
477 
478         // If the user has already sent a GO_AWAY frame they may be attempting to do a graceful shutdown which requires
479         // sending multiple GO_AWAY frames. We should only send a GO_AWAY here if one has not already been sent. If
480         // a GO_AWAY has been sent we send a empty buffer just so we can wait to close until all other data has been
481         // flushed to the OS.
482         // https://github.com/netty/netty/issues/5307
483         final ChannelFuture future = connection().goAwaySent() ? ctx.write(EMPTY_BUFFER) : goAway(ctx, null);
484         ctx.flush();
485         doGracefulShutdown(ctx, future, promise);
486     }
487 
488     private void doGracefulShutdown(ChannelHandlerContext ctx, ChannelFuture future, ChannelPromise promise) {
489         if (isGracefulShutdownComplete()) {
490             // If there are no active streams, close immediately after the GO_AWAY write completes.
491             future.addListener(new ClosingChannelFutureListener(ctx, promise));
492         } else {
493             // If there are active streams we should wait until they are all closed before closing the connection.
494             if (gracefulShutdownTimeoutMillis < 0) {
495                 closeListener = new ClosingChannelFutureListener(ctx, promise);
496             } else {
497                 closeListener = new ClosingChannelFutureListener(ctx, promise,
498                                                                  gracefulShutdownTimeoutMillis, MILLISECONDS);
499             }
500         }
501     }
502 
503     @Override
504     public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
505         ctx.deregister(promise);
506     }
507 
508     @Override
509     public void read(ChannelHandlerContext ctx) throws Exception {
510         ctx.read();
511     }
512 
513     @Override
514     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
515         ctx.write(msg, promise);
516     }
517 
518     @Override
519     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
520         // Trigger flush after read on the assumption that flush is cheap if there is nothing to write and that
521         // for flow-control the read may release window that causes data to be written that can now be flushed.
522         try {
523             // First call channelReadComplete0(...) as this may produce more data that we want to flush
524             channelReadComplete0(ctx);
525         } finally {
526             flush(ctx);
527         }
528     }
529 
530     void channelReadComplete0(ChannelHandlerContext ctx) throws Exception {
531         super.channelReadComplete(ctx);
532     }
533 
534     /**
535      * Handles {@link Http2Exception} objects that were thrown from other handlers. Ignores all other exceptions.
536      */
537     @Override
538     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
539         if (getEmbeddedHttp2Exception(cause) != null) {
540             // Some exception in the causality chain is an Http2Exception - handle it.
541             onError(ctx, false, cause);
542         } else {
543             super.exceptionCaught(ctx, cause);
544         }
545     }
546 
547     /**
548      * Closes the local side of the given stream. If this causes the stream to be closed, adds a
549      * hook to close the channel after the given future completes.
550      *
551      * @param stream the stream to be half closed.
552      * @param future If closing, the future after which to close the channel.
553      */
554     @Override
555     public void closeStreamLocal(Http2Stream stream, ChannelFuture future) {
556         switch (stream.state()) {
557             case HALF_CLOSED_LOCAL:
558             case OPEN:
559                 stream.closeLocalSide();
560                 break;
561             default:
562                 closeStream(stream, future);
563                 break;
564         }
565     }
566 
567     /**
568      * Closes the remote side of the given stream. If this causes the stream to be closed, adds a
569      * hook to close the channel after the given future completes.
570      *
571      * @param stream the stream to be half closed.
572      * @param future If closing, the future after which to close the channel.
573      */
574     @Override
575     public void closeStreamRemote(Http2Stream stream, ChannelFuture future) {
576         switch (stream.state()) {
577             case HALF_CLOSED_REMOTE:
578             case OPEN:
579                 stream.closeRemoteSide();
580                 break;
581             default:
582                 closeStream(stream, future);
583                 break;
584         }
585     }
586 
587     @Override
588     public void closeStream(final Http2Stream stream, ChannelFuture future) {
589         stream.close();
590 
591         if (future.isDone()) {
592             checkCloseConnection(future);
593         } else {
594             future.addListener(new ChannelFutureListener() {
595                 @Override
596                 public void operationComplete(ChannelFuture future) throws Exception {
597                     checkCloseConnection(future);
598                 }
599             });
600         }
601     }
602 
603     /**
604      * Central handler for all exceptions caught during HTTP/2 processing.
605      */
606     @Override
607     public void onError(ChannelHandlerContext ctx, boolean outbound, Throwable cause) {
608         Http2Exception embedded = getEmbeddedHttp2Exception(cause);
609         if (isStreamError(embedded)) {
610             onStreamError(ctx, outbound, cause, (StreamException) embedded);
611         } else if (embedded instanceof CompositeStreamException) {
612             CompositeStreamException compositException = (CompositeStreamException) embedded;
613             for (StreamException streamException : compositException) {
614                 onStreamError(ctx, outbound, cause, streamException);
615             }
616         } else {
617             onConnectionError(ctx, outbound, cause, embedded);
618         }
619         ctx.flush();
620     }
621 
622     /**
623      * Called by the graceful shutdown logic to determine when it is safe to close the connection. Returns {@code true}
624      * if the graceful shutdown has completed and the connection can be safely closed. This implementation just
625      * guarantees that there are no active streams. Subclasses may override to provide additional checks.
626      */
627     protected boolean isGracefulShutdownComplete() {
628         return connection().numActiveStreams() == 0;
629     }
630 
631     /**
632      * Handler for a connection error. Sends a GO_AWAY frame to the remote endpoint. Once all
633      * streams are closed, the connection is shut down.
634      *
635      * @param ctx the channel context
636      * @param outbound {@code true} if the error was caused by an outbound operation.
637      * @param cause the exception that was caught
638      * @param http2Ex the {@link Http2Exception} that is embedded in the causality chain. This may
639      *            be {@code null} if it's an unknown exception.
640      */
641     protected void onConnectionError(ChannelHandlerContext ctx, boolean outbound,
642                                      Throwable cause, Http2Exception http2Ex) {
643         if (http2Ex == null) {
644             http2Ex = new Http2Exception(INTERNAL_ERROR, cause.getMessage(), cause);
645         }
646 
647         ChannelPromise promise = ctx.newPromise();
648         ChannelFuture future = goAway(ctx, http2Ex);
649         switch (http2Ex.shutdownHint()) {
650         case GRACEFUL_SHUTDOWN:
651             doGracefulShutdown(ctx, future, promise);
652             break;
653         default:
654             future.addListener(new ClosingChannelFutureListener(ctx, promise));
655             break;
656         }
657     }
658 
659     /**
660      * Handler for a stream error. Sends a {@code RST_STREAM} frame to the remote endpoint and closes the
661      * stream.
662      *
663      * @param ctx the channel context
664      * @param outbound {@code true} if the error was caused by an outbound operation.
665      * @param cause the exception that was caught
666      * @param http2Ex the {@link StreamException} that is embedded in the causality chain.
667      */
668     protected void onStreamError(ChannelHandlerContext ctx, boolean outbound,
669                                  @SuppressWarnings("unused") Throwable cause, StreamException http2Ex) {
670         final int streamId = http2Ex.streamId();
671         Http2Stream stream = connection().stream(streamId);
672 
673         //if this is caused by reading headers that are too large, send a header with status 431
674         if (http2Ex instanceof Http2Exception.HeaderListSizeException &&
675             ((Http2Exception.HeaderListSizeException) http2Ex).duringDecode() &&
676             connection().isServer()) {
677 
678             // NOTE We have to check to make sure that a stream exists before we send our reply.
679             // We likely always create the stream below as the stream isn't created until the
680             // header block is completely processed.
681 
682             // The case of a streamId referring to a stream which was already closed is handled
683             // by createStream and will land us in the catch block below
684             if (stream == null) {
685                 try {
686                     stream = encoder.connection().remote().createStream(streamId, true);
687                 } catch (Http2Exception e) {
688                     resetUnknownStream(ctx, streamId, http2Ex.error().code(), ctx.newPromise());
689                     return;
690                 }
691             }
692 
693             // ensure that we have not already sent headers on this stream
694             if (stream != null && !stream.isHeadersSent()) {
695                 try {
696                     handleServerHeaderDecodeSizeError(ctx, stream);
697                 } catch (Throwable cause2) {
698                     onError(ctx, outbound, connectionError(INTERNAL_ERROR, cause2, "Error DecodeSizeError"));
699                 }
700             }
701         }
702 
703         if (stream == null) {
704             resetUnknownStream(ctx, streamId, http2Ex.error().code(), ctx.newPromise());
705         } else {
706             resetStream(ctx, stream, http2Ex.error().code(), ctx.newPromise());
707         }
708     }
709 
710     /**
711      * Notifies client that this server has received headers that are larger than what it is
712      * willing to accept. Override to change behavior.
713      *
714      * @param ctx the channel context
715      * @param stream the Http2Stream on which the header was received
716      */
717     protected void handleServerHeaderDecodeSizeError(ChannelHandlerContext ctx, Http2Stream stream) {
718         encoder().writeHeaders(ctx, stream.id(), HEADERS_TOO_LARGE_HEADERS, 0, true, ctx.newPromise());
719     }
720 
721     protected Http2FrameWriter frameWriter() {
722         return encoder().frameWriter();
723     }
724 
725     /**
726      * Sends a {@code RST_STREAM} frame even if we don't know about the stream. This error condition is most likely
727      * triggered by the first frame of a stream being invalid. That is, there was an error reading the frame before
728      * we could create a new stream.
729      */
730     private ChannelFuture resetUnknownStream(final ChannelHandlerContext ctx, int streamId, long errorCode,
731                                              ChannelPromise promise) {
732         ChannelFuture future = frameWriter().writeRstStream(ctx, streamId, errorCode, promise);
733         if (future.isDone()) {
734             closeConnectionOnError(ctx, future);
735         } else {
736             future.addListener(new ChannelFutureListener() {
737                 @Override
738                 public void operationComplete(ChannelFuture future) throws Exception {
739                     closeConnectionOnError(ctx, future);
740                 }
741             });
742         }
743         return future;
744     }
745 
746     @Override
747     public ChannelFuture resetStream(final ChannelHandlerContext ctx, int streamId, long errorCode,
748                                      ChannelPromise promise) {
749         final Http2Stream stream = connection().stream(streamId);
750         if (stream == null) {
751             return resetUnknownStream(ctx, streamId, errorCode, promise.unvoid());
752         }
753 
754        return resetStream(ctx, stream, errorCode, promise);
755     }
756 
757     private ChannelFuture resetStream(final ChannelHandlerContext ctx, final Http2Stream stream,
758                                       long errorCode, ChannelPromise promise) {
759         promise = promise.unvoid();
760         if (stream.isResetSent()) {
761             // Don't write a RST_STREAM frame if we have already written one.
762             return promise.setSuccess();
763         }
764         final ChannelFuture future;
765         // If the remote peer is not aware of the steam, then we are not allowed to send a RST_STREAM
766         // https://tools.ietf.org/html/rfc7540#section-6.4.
767         if (stream.state() == IDLE ||
768             connection().local().created(stream) && !stream.isHeadersSent() && !stream.isPushPromiseSent()) {
769             future = promise.setSuccess();
770         } else {
771             future = frameWriter().writeRstStream(ctx, stream.id(), errorCode, promise);
772         }
773 
774         // Synchronously set the resetSent flag to prevent any subsequent calls
775         // from resulting in multiple reset frames being sent.
776         stream.resetSent();
777 
778         if (future.isDone()) {
779             processRstStreamWriteResult(ctx, stream, future);
780         } else {
781             future.addListener(new ChannelFutureListener() {
782                 @Override
783                 public void operationComplete(ChannelFuture future) throws Exception {
784                     processRstStreamWriteResult(ctx, stream, future);
785                 }
786             });
787         }
788 
789         return future;
790     }
791 
792     @Override
793     public ChannelFuture goAway(final ChannelHandlerContext ctx, final int lastStreamId, final long errorCode,
794                                 final ByteBuf debugData, ChannelPromise promise) {
795         try {
796             promise = promise.unvoid();
797             final Http2Connection connection = connection();
798             if (connection().goAwaySent()) {
799                 // Protect against re-entrancy. Could happen if writing the frame fails, and error handling
800                 // treating this is a connection handler and doing a graceful shutdown...
801                 if (lastStreamId == connection().remote().lastStreamKnownByPeer()) {
802                     // Release the data and notify the promise
803                     debugData.release();
804                     return promise.setSuccess();
805                 }
806                 if (lastStreamId > connection.remote().lastStreamKnownByPeer()) {
807                     throw connectionError(PROTOCOL_ERROR, "Last stream identifier must not increase between " +
808                                                           "sending multiple GOAWAY frames (was '%d', is '%d').",
809                                           connection.remote().lastStreamKnownByPeer(), lastStreamId);
810                 }
811             }
812 
813             connection.goAwaySent(lastStreamId, errorCode, debugData);
814 
815             // Need to retain before we write the buffer because if we do it after the refCnt could already be 0 and
816             // result in an IllegalRefCountException.
817             debugData.retain();
818             ChannelFuture future = frameWriter().writeGoAway(ctx, lastStreamId, errorCode, debugData, promise);
819 
820             if (future.isDone()) {
821                 processGoAwayWriteResult(ctx, lastStreamId, errorCode, debugData, future);
822             } else {
823                 future.addListener(new ChannelFutureListener() {
824                     @Override
825                     public void operationComplete(ChannelFuture future) throws Exception {
826                         processGoAwayWriteResult(ctx, lastStreamId, errorCode, debugData, future);
827                     }
828                 });
829             }
830 
831             return future;
832         } catch (Throwable cause) { // Make sure to catch Throwable because we are doing a retain() in this method.
833             debugData.release();
834             return promise.setFailure(cause);
835         }
836     }
837 
838     /**
839      * Closes the connection if the graceful shutdown process has completed.
840      * @param future Represents the status that will be passed to the {@link #closeListener}.
841      */
842     private void checkCloseConnection(ChannelFuture future) {
843         // If this connection is closing and the graceful shutdown has completed, close the connection
844         // once this operation completes.
845         if (closeListener != null && isGracefulShutdownComplete()) {
846             ChannelFutureListener closeListener = this.closeListener;
847             // This method could be called multiple times
848             // and we don't want to notify the closeListener multiple times.
849             this.closeListener = null;
850             try {
851                 closeListener.operationComplete(future);
852             } catch (Exception e) {
853                 throw new IllegalStateException("Close listener threw an unexpected exception", e);
854             }
855         }
856     }
857 
858     /**
859      * Close the remote endpoint with with a {@code GO_AWAY} frame. Does <strong>not</strong> flush
860      * immediately, this is the responsibility of the caller.
861      */
862     private ChannelFuture goAway(ChannelHandlerContext ctx, Http2Exception cause) {
863         long errorCode = cause != null ? cause.error().code() : NO_ERROR.code();
864         int lastKnownStream = connection().remote().lastStreamCreated();
865         return goAway(ctx, lastKnownStream, errorCode, Http2CodecUtil.toByteBuf(ctx, cause), ctx.newPromise());
866     }
867 
868     private void processRstStreamWriteResult(ChannelHandlerContext ctx, Http2Stream stream, ChannelFuture future) {
869         if (future.isSuccess()) {
870             closeStream(stream, future);
871         } else {
872             // The connection will be closed and so no need to change the resetSent flag to false.
873             onConnectionError(ctx, true, future.cause(), null);
874         }
875     }
876 
877     private void closeConnectionOnError(ChannelHandlerContext ctx, ChannelFuture future) {
878         if (!future.isSuccess()) {
879             onConnectionError(ctx, true, future.cause(), null);
880         }
881     }
882 
883     /**
884      * Returns the client preface string if this is a client connection, otherwise returns {@code null}.
885      */
886     private static ByteBuf clientPrefaceString(Http2Connection connection) {
887         return connection.isServer() ? connectionPrefaceBuf() : null;
888     }
889 
890     private static void processGoAwayWriteResult(final ChannelHandlerContext ctx, final int lastStreamId,
891                                                  final long errorCode, final ByteBuf debugData, ChannelFuture future) {
892         try {
893             if (future.isSuccess()) {
894                 if (errorCode != NO_ERROR.code()) {
895                     if (logger.isDebugEnabled()) {
896                         logger.debug("{} Sent GOAWAY: lastStreamId '{}', errorCode '{}', " +
897                                      "debugData '{}'. Forcing shutdown of the connection.",
898                                      ctx.channel(), lastStreamId, errorCode, debugData.toString(UTF_8), future.cause());
899                     }
900                     ctx.close();
901                 }
902             } else {
903                 if (logger.isDebugEnabled()) {
904                     logger.debug("{} Sending GOAWAY failed: lastStreamId '{}', errorCode '{}', " +
905                                  "debugData '{}'. Forcing shutdown of the connection.",
906                                  ctx.channel(), lastStreamId, errorCode, debugData.toString(UTF_8), future.cause());
907                 }
908                 ctx.close();
909             }
910         } finally {
911             // We're done with the debug data now.
912             debugData.release();
913         }
914     }
915 
916     /**
917      * Closes the channel when the future completes.
918      */
919     private static final class ClosingChannelFutureListener implements ChannelFutureListener {
920         private final ChannelHandlerContext ctx;
921         private final ChannelPromise promise;
922         private final ScheduledFuture<?> timeoutTask;
923 
924         ClosingChannelFutureListener(ChannelHandlerContext ctx, ChannelPromise promise) {
925             this.ctx = ctx;
926             this.promise = promise;
927             timeoutTask = null;
928         }
929 
930         ClosingChannelFutureListener(final ChannelHandlerContext ctx, final ChannelPromise promise,
931                                      long timeout, TimeUnit unit) {
932             this.ctx = ctx;
933             this.promise = promise;
934             timeoutTask = ctx.executor().schedule(new Runnable() {
935                 @Override
936                 public void run() {
937                     ctx.close(promise);
938                 }
939             }, timeout, unit);
940         }
941 
942         @Override
943         public void operationComplete(ChannelFuture sentGoAwayFuture) throws Exception {
944             if (timeoutTask != null) {
945                 timeoutTask.cancel(false);
946             }
947             ctx.close(promise);
948         }
949     }
950 }