View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License, version 2.0 (the
5    * "License"); you may not use this file except in compliance with the License. You may obtain a
6    * copy of the License at:
7    *
8    * https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software distributed under the License
11   * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12   * or implied. See the License for the specific language governing permissions and limitations under
13   * the License.
14   */
15  package io.netty5.handler.codec.http2;
16  
17  import io.netty5.buffer.BufferUtil;
18  import io.netty5.buffer.api.Buffer;
19  import io.netty5.buffer.api.BufferAllocator;
20  import io.netty5.channel.ChannelHandlerContext;
21  import io.netty5.channel.ChannelOption;
22  import io.netty5.handler.codec.ByteToMessageDecoder;
23  import io.netty5.handler.codec.http.HttpResponseStatus;
24  import io.netty5.handler.codec.http2.Http2Exception.CompositeStreamException;
25  import io.netty5.handler.codec.http2.Http2Exception.StreamException;
26  import io.netty5.util.concurrent.Future;
27  import io.netty5.util.concurrent.FutureListener;
28  import io.netty5.util.concurrent.Promise;
29  import io.netty5.util.internal.UnstableApi;
30  import io.netty5.util.internal.logging.InternalLogger;
31  import io.netty5.util.internal.logging.InternalLoggerFactory;
32  
33  import java.nio.charset.StandardCharsets;
34  import java.util.concurrent.TimeUnit;
35  
36  import static io.netty5.channel.ChannelFutureListeners.CLOSE_ON_FAILURE;
37  import static io.netty5.handler.codec.http2.Http2CodecUtil.HTTP_UPGRADE_STREAM_ID;
38  import static io.netty5.handler.codec.http2.Http2CodecUtil.connectionPrefaceBuffer;
39  import static io.netty5.handler.codec.http2.Http2CodecUtil.getEmbeddedHttp2Exception;
40  import static io.netty5.handler.codec.http2.Http2Error.INTERNAL_ERROR;
41  import static io.netty5.handler.codec.http2.Http2Error.NO_ERROR;
42  import static io.netty5.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
43  import static io.netty5.handler.codec.http2.Http2Exception.connectionError;
44  import static io.netty5.handler.codec.http2.Http2Exception.isStreamError;
45  import static io.netty5.handler.codec.http2.Http2FrameTypes.SETTINGS;
46  import static io.netty5.handler.codec.http2.Http2Stream.State.IDLE;
47  import static io.netty5.util.CharsetUtil.UTF_8;
48  import static java.lang.Math.min;
49  import static java.util.Objects.requireNonNull;
50  import static java.util.concurrent.TimeUnit.MILLISECONDS;
51  
52  /**
53   * Provides the default implementation for processing inbound frame events and delegates to a
54   * {@link Http2FrameListener}
55   * <p>
56   * This class will read HTTP/2 frames and delegate the events to a {@link Http2FrameListener}
57   * <p>
58   * This interface enforces inbound flow control functionality through
59   * {@link Http2LocalFlowController}
60   */
61  @UnstableApi
62  public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http2LifecycleManager {
63  
64      private static final InternalLogger logger = InternalLoggerFactory.getInstance(Http2ConnectionHandler.class);
65  
66      private static final Http2Headers HEADERS_TOO_LARGE_HEADERS = ReadOnlyHttp2Headers.serverHeaders(false,
67              HttpResponseStatus.REQUEST_HEADER_FIELDS_TOO_LARGE.codeAsText());
68      private static final Buffer HTTP_1_X_BUF =
69              BufferAllocator.offHeapUnpooled().copyOf(new byte[] {'H', 'T', 'T', 'P', '/', '1', '.'}).makeReadOnly();
70  
71      private final Http2ConnectionDecoder decoder;
72      private final Http2ConnectionEncoder encoder;
73      private final Http2Settings initialSettings;
74      private final boolean decoupleCloseAndGoAway;
75      private final boolean flushPreface;
76      private FutureListener<Object> closeListener;
77      private BaseDecoder byteDecoder;
78      private long gracefulShutdownTimeoutMillis;
79  
80      protected Http2ConnectionHandler(Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder,
81                                       Http2Settings initialSettings) {
82          this(decoder, encoder, initialSettings, false);
83      }
84  
85      protected Http2ConnectionHandler(Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder,
86                                       Http2Settings initialSettings, boolean decoupleCloseAndGoAway) {
87          this(decoder, encoder, initialSettings, decoupleCloseAndGoAway, true);
88      }
89  
90      protected Http2ConnectionHandler(Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder,
91                                       Http2Settings initialSettings, boolean decoupleCloseAndGoAway,
92                                       boolean flushPreface) {
93          this.initialSettings = requireNonNull(initialSettings, "initialSettings");
94          this.decoder = requireNonNull(decoder, "decoder");
95          this.encoder = requireNonNull(encoder, "encoder");
96          this.decoupleCloseAndGoAway = decoupleCloseAndGoAway;
97          this.flushPreface = flushPreface;
98          if (encoder.connection() != decoder.connection()) {
99              throw new IllegalArgumentException("Encoder and Decoder do not share the same connection object");
100         }
101     }
102 
103     /**
104      * Get the amount of time (in milliseconds) this endpoint will wait for all streams to be closed before closing
105      * the connection during the graceful shutdown process. Returns -1 if this connection is configured to wait
106      * indefinitely for all streams to close.
107      */
108     public long gracefulShutdownTimeoutMillis() {
109         return gracefulShutdownTimeoutMillis;
110     }
111 
112     /**
113      * Set the amount of time (in milliseconds) this endpoint will wait for all streams to be closed before closing
114      * the connection during the graceful shutdown process.
115      * @param gracefulShutdownTimeoutMillis the amount of time (in milliseconds) this endpoint will wait for all
116      * streams to be closed before closing the connection during the graceful shutdown process.
117      */
118     public void gracefulShutdownTimeoutMillis(long gracefulShutdownTimeoutMillis) {
119         if (gracefulShutdownTimeoutMillis < -1) {
120             throw new IllegalArgumentException("gracefulShutdownTimeoutMillis: " + gracefulShutdownTimeoutMillis +
121                                                " (expected: -1 for indefinite or >= 0)");
122         }
123         this.gracefulShutdownTimeoutMillis = gracefulShutdownTimeoutMillis;
124     }
125 
126     public Http2Connection connection() {
127         return encoder.connection();
128     }
129 
130     public Http2ConnectionDecoder decoder() {
131         return decoder;
132     }
133 
134     public Http2ConnectionEncoder encoder() {
135         return encoder;
136     }
137 
138     private boolean prefaceSent() {
139         return byteDecoder != null && byteDecoder.prefaceSent();
140     }
141 
142     /**
143      * Handles the client-side (cleartext) upgrade from HTTP to HTTP/2.
144      * Reserves local stream 1 for the HTTP/2 response.
145      */
146     public void onHttpClientUpgrade() throws Http2Exception {
147         if (connection().isServer()) {
148             throw connectionError(PROTOCOL_ERROR, "Client-side HTTP upgrade requested for a server");
149         }
150         if (!prefaceSent()) {
151             // If the preface was not sent yet it most likely means the handler was not added to the pipeline before
152             // calling this method.
153             throw connectionError(INTERNAL_ERROR, "HTTP upgrade must occur after preface was sent");
154         }
155         if (decoder.prefaceReceived()) {
156             throw connectionError(PROTOCOL_ERROR, "HTTP upgrade must occur before HTTP/2 preface is received");
157         }
158 
159         // Create a local stream used for the HTTP cleartext upgrade.
160         connection().local().createStream(HTTP_UPGRADE_STREAM_ID, true);
161     }
162 
163     /**
164      * Handles the server-side (cleartext) upgrade from HTTP to HTTP/2.
165      * @param settings the settings for the remote endpoint.
166      */
167     public void onHttpServerUpgrade(Http2Settings settings) throws Http2Exception {
168         if (!connection().isServer()) {
169             throw connectionError(PROTOCOL_ERROR, "Server-side HTTP upgrade requested for a client");
170         }
171         if (!prefaceSent()) {
172             // If the preface was not sent yet it most likely means the handler was not added to the pipeline before
173             // calling this method.
174             throw connectionError(INTERNAL_ERROR, "HTTP upgrade must occur after preface was sent");
175         }
176         if (decoder.prefaceReceived()) {
177             throw connectionError(PROTOCOL_ERROR, "HTTP upgrade must occur before HTTP/2 preface is received");
178         }
179 
180         // Apply the settings but no ACK is necessary.
181         encoder.remoteSettings(settings);
182 
183         // Create a stream in the half-closed state.
184         connection().remote().createStream(HTTP_UPGRADE_STREAM_ID, true);
185     }
186 
187     @Override
188     public void flush(ChannelHandlerContext ctx) {
189         try {
190             // Trigger pending writes in the remote flow controller.
191             encoder.flowController().writePendingBytes();
192             ctx.flush();
193         } catch (Http2Exception e) {
194             onError(ctx, true, e);
195         } catch (Throwable cause) {
196             onError(ctx, true, connectionError(INTERNAL_ERROR, cause, "Error flushing"));
197         }
198     }
199 
200     private abstract class BaseDecoder {
201         public abstract void decode(ChannelHandlerContext ctx, Buffer in) throws Exception;
202         public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { }
203         public void channelActive(ChannelHandlerContext ctx) throws Exception { }
204 
205         public void channelInactive(ChannelHandlerContext ctx) throws Exception {
206             // Connection has terminated, close the encoder and decoder.
207             encoder().close();
208             decoder().close();
209 
210             // We need to remove all streams (not just the active ones).
211             // See https://github.com/netty/netty/issues/4838.
212             connection().close(ctx.newPromise());
213         }
214 
215         /**
216          * Determine if the HTTP/2 connection preface been sent.
217          */
218         public boolean prefaceSent() {
219             return true;
220         }
221     }
222 
223     private final class PrefaceDecoder extends BaseDecoder {
224         private Buffer clientPrefaceString;
225         private boolean prefaceSent;
226 
227         PrefaceDecoder(ChannelHandlerContext ctx) throws Exception {
228             clientPrefaceString = clientPrefaceString(encoder.connection());
229             // This handler was just added to the context. In case it was handled after
230             // the connection became active, send the connection preface now.
231             sendPreface(ctx);
232         }
233 
234         @Override
235         public boolean prefaceSent() {
236             return prefaceSent;
237         }
238 
239         @Override
240         public void decode(ChannelHandlerContext ctx, Buffer in) throws Exception {
241             try {
242                 if (ctx.channel().isActive() && readClientPrefaceString(in) && verifyFirstFrameIsSettings(in)) {
243                     // After the preface is read, it is time to hand over control to the post initialized decoder.
244                     byteDecoder = new FrameDecoder();
245                     byteDecoder.decode(ctx, in);
246                 }
247             } catch (Throwable e) {
248                 onError(ctx, false, e);
249             }
250         }
251 
252         @Override
253         public void channelActive(ChannelHandlerContext ctx) throws Exception {
254             // The channel just became active - send the connection preface to the remote endpoint.
255             sendPreface(ctx);
256 
257             if (flushPreface) {
258                 // As we don't know if any channelReadComplete() events will be triggered at all we need to ensure we
259                 // also flush. Otherwise the remote peer might never see the preface / settings frame.
260                 // See https://github.com/netty/netty/issues/12089
261                 ctx.flush();
262             }
263         }
264 
265         @Override
266         public void channelInactive(ChannelHandlerContext ctx) throws Exception {
267             cleanup();
268             super.channelInactive(ctx);
269         }
270 
271         /**
272          * Releases the {@code clientPrefaceString}. Any active streams will be left in the open.
273          */
274         @Override
275         public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
276             cleanup();
277         }
278 
279         /**
280          * Releases the {@code clientPrefaceString}. Any active streams will be left in the open.
281          */
282         private void cleanup() {
283             if (clientPrefaceString != null) {
284                 clientPrefaceString.close();
285                 clientPrefaceString = null;
286             }
287         }
288 
289         /**
290          * Decodes the client connection preface string from the input buffer.
291          *
292          * @return {@code true} if processing of the client preface string is complete. Since client preface strings can
293          *         only be received by servers, returns true immediately for client endpoints.
294          */
295         private boolean readClientPrefaceString(Buffer in) throws Http2Exception {
296             if (clientPrefaceString == null) {
297                 return true;
298             }
299 
300             int prefaceRemaining = clientPrefaceString.readableBytes();
301             int bytesRead = min(in.readableBytes(), prefaceRemaining);
302 
303             // If the input so far doesn't match the preface, break the connection.
304             if (bytesRead == 0 || !BufferUtil.equals(in, in.readerOffset(),
305                                                      clientPrefaceString, clientPrefaceString.readerOffset(),
306                                                      bytesRead)) {
307                 int maxSearch = 1024; // picked because 512 is too little, and 2048 too much
308                 in.makeReadOnly();
309                 try (Buffer preface = in.copy(in.readerOffset(), min(in.readableBytes(), maxSearch), true)) {
310                     int http1Index = preface.bytesBefore(HTTP_1_X_BUF);
311                     if (http1Index != -1) {
312                         try (Buffer chunk = preface.readSplit(http1Index)) {
313                             throw connectionError(PROTOCOL_ERROR, "Unexpected HTTP/1.x request: %s",
314                                                   chunk.toString(StandardCharsets.US_ASCII));
315                         }
316                     }
317                 }
318                 String receivedBytes = BufferUtil.hexDump(in, in.readerOffset(),
319                                                min(in.readableBytes(), clientPrefaceString.readableBytes()));
320                 throw connectionError(PROTOCOL_ERROR, "HTTP/2 client preface string missing or corrupt. " +
321                                                       "Hex dump for received bytes: %s", receivedBytes);
322             }
323             in.skipReadableBytes(bytesRead);
324             clientPrefaceString.skipReadableBytes(bytesRead);
325 
326             if (clientPrefaceString.readableBytes() == 0) {
327                 // Entire preface has been read.
328                 clientPrefaceString.close();
329                 clientPrefaceString = null;
330                 return true;
331             }
332             return false;
333         }
334 
335         /**
336          * Peeks at that the next frame in the buffer and verifies that it is a non-ack {@code SETTINGS} frame.
337          *
338          * @param in the inbound buffer.
339          * @return {@code true} if the next frame is a non-ack {@code SETTINGS} frame, {@code false} if more
340          * data is required before we can determine the next frame type.
341          * @throws Http2Exception thrown if the next frame is NOT a non-ack {@code SETTINGS} frame.
342          */
343         private boolean verifyFirstFrameIsSettings(Buffer in) throws Http2Exception {
344             if (in.readableBytes() < 5) {
345                 // Need more data before we can see the frame type for the first frame.
346                 return false;
347             }
348 
349             int frameType = in.getUnsignedByte(in.readerOffset() + 3);
350             int flags = in.getUnsignedByte(in.readerOffset() + 4);
351             if (frameType != SETTINGS || (flags & Http2Flags.ACK) != 0) {
352                 throw connectionError(PROTOCOL_ERROR, "First received frame was not SETTINGS. " +
353                                                       "Hex dump for first 5 bytes: %s",
354                                       BufferUtil.hexDump(in, in.readerOffset(), 5));
355             }
356             return true;
357         }
358 
359         /**
360          * Sends the HTTP/2 connection preface upon establishment of the connection, if not already sent.
361          */
362         private void sendPreface(ChannelHandlerContext ctx) throws Exception {
363             if (prefaceSent || !ctx.channel().isActive()) {
364                 return;
365             }
366 
367             prefaceSent = true;
368 
369             final boolean isClient = !connection().isServer();
370             if (isClient) {
371                 // Clients must send the preface string as the first bytes on the connection.
372                 ctx.write(connectionPrefaceBuffer()).addListener(ctx.channel(), CLOSE_ON_FAILURE);
373             }
374 
375             // Both client and server must send their initial settings.
376             encoder.writeSettings(ctx, initialSettings)
377                    .addListener(ctx.channel(), CLOSE_ON_FAILURE);
378 
379             if (isClient) {
380                 // If this handler is extended by the user and we directly fire the userEvent from this context then
381                 // the user will not see the event. We should fire the event starting with this handler so this class
382                 // (and extending classes) have a chance to process the event.
383                 channelInboundEvent(ctx, Http2ConnectionPrefaceAndSettingsFrameWrittenEvent.INSTANCE);
384             }
385         }
386     }
387 
388     private final class FrameDecoder extends BaseDecoder {
389         @Override
390         public void decode(ChannelHandlerContext ctx, Buffer in) throws Exception {
391             try {
392                 decoder.decodeFrame(ctx, in);
393             } catch (Throwable e) {
394                 onError(ctx, false, e);
395             }
396         }
397     }
398 
399     @Override
400     public void handlerAdded0(ChannelHandlerContext ctx) throws Exception {
401         // Initialize the encoder, decoder, flow controllers, and internal state.
402         encoder.lifecycleManager(this);
403         decoder.lifecycleManager(this);
404         encoder.flowController().channelHandlerContext(ctx);
405         decoder.flowController().channelHandlerContext(ctx);
406         byteDecoder = new PrefaceDecoder(ctx);
407     }
408 
409     @Override
410     protected void handlerRemoved0(ChannelHandlerContext ctx) throws Exception {
411         if (byteDecoder != null) {
412             byteDecoder.handlerRemoved(ctx);
413             byteDecoder = null;
414         }
415     }
416 
417     @Override
418     public void channelActive(ChannelHandlerContext ctx) throws Exception {
419         if (byteDecoder == null) {
420             byteDecoder = new PrefaceDecoder(ctx);
421         }
422         byteDecoder.channelActive(ctx);
423         super.channelActive(ctx);
424     }
425 
426     @Override
427     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
428         // Call super class first, as this may result in decode being called.
429         super.channelInactive(ctx);
430         if (byteDecoder != null) {
431             byteDecoder.channelInactive(ctx);
432             byteDecoder = null;
433         }
434     }
435 
436     @Override
437     public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
438         // Writability is expected to change while we are writing. We cannot allow this event to trigger reentering
439         // the allocation and write loop. Reentering the event loop will lead to over or illegal allocation.
440         try {
441             if (ctx.channel().isWritable()) {
442                 flush(ctx);
443             }
444             encoder.flowController().channelWritabilityChanged();
445         } finally {
446             super.channelWritabilityChanged(ctx);
447         }
448     }
449 
450     @Override
451     protected void decode(ChannelHandlerContext ctx, Buffer in) throws Exception {
452         byteDecoder.decode(ctx, in);
453     }
454 
455     @Override
456     public Future<Void> close(ChannelHandlerContext ctx) {
457         if (decoupleCloseAndGoAway) {
458             return ctx.close();
459         }
460         // Avoid NotYetConnectedException and avoid sending before connection preface
461         if (!ctx.channel().isActive() || !prefaceSent()) {
462             return ctx.close();
463         }
464 
465         // If the user has already sent a GO_AWAY frame they may be attempting to do a graceful shutdown which requires
466         // sending multiple GO_AWAY frames. We should only send a GO_AWAY here if one has not already been sent. If
467         // a GO_AWAY has been sent we send a empty buffer just so we can wait to close until all other data has been
468         // flushed to the OS.
469         // https://github.com/netty/netty/issues/5307
470         Future<Void> f = connection().goAwaySent() ? ctx.write(ctx.bufferAllocator().allocate(0)) : goAway(ctx, null);
471         ctx.flush();
472         Promise<Void> promise = ctx.newPromise();
473         doGracefulShutdown(ctx, f, promise);
474         return promise.asFuture();
475     }
476 
477     private FutureListener<Object> newClosingChannelFutureListener(
478             ChannelHandlerContext ctx, Promise<Void> promise) {
479         long gracefulShutdownTimeoutMillis = this.gracefulShutdownTimeoutMillis;
480         return gracefulShutdownTimeoutMillis < 0 ?
481                 new ClosingChannelFutureListener(ctx, promise) :
482                 new ClosingChannelFutureListener(ctx, promise, gracefulShutdownTimeoutMillis, MILLISECONDS);
483     }
484 
485     private void doGracefulShutdown(ChannelHandlerContext ctx, Future<Void> future, final Promise<Void> promise) {
486         FutureListener<Object> listener = newClosingChannelFutureListener(ctx, promise);
487         if (isGracefulShutdownComplete()) {
488             // If there are no active streams, close immediately after the GO_AWAY write completes or the timeout
489             // elapsed.
490             future.addListener(listener);
491         } else {
492             // If there are active streams we should wait until they are all closed before closing the connection.
493 
494             // The ClosingChannelFutureListener will cascade promise completion. We need to always notify the
495             // new ClosingChannelFutureListener when the graceful close completes if the promise is not null.
496             if (closeListener == null) {
497                 closeListener = listener;
498             } else if (promise != null) {
499                 FutureListener<Object> oldCloseListener = closeListener;
500                 closeListener = future1 -> {
501                     try {
502                         oldCloseListener.operationComplete(future1);
503                     } finally {
504                         listener.operationComplete(future1);
505                     }
506                 };
507             }
508         }
509     }
510 
511     @Override
512     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
513         // Trigger flush after read on the assumption that flush is cheap if there is nothing to write and that
514         // for flow-control the read may release window that causes data to be written that can now be flushed.
515         try {
516             // First call channelReadComplete0(...) as this may produce more data that we want to flush
517             channelReadComplete0(ctx);
518         } finally {
519             flush(ctx);
520         }
521     }
522 
523     final void channelReadComplete0(ChannelHandlerContext ctx) {
524         // Discard bytes of the cumulation buffer if needed.
525         discardSomeReadBytes();
526 
527         // Ensure we never stall the HTTP/2 Channel. Flow-control is enforced by HTTP/2.
528         //
529         // See https://tools.ietf.org/html/rfc7540#section-5.2.2
530         if (!ctx.channel().getOption(ChannelOption.AUTO_READ)) {
531             ctx.read();
532         }
533 
534         ctx.fireChannelReadComplete();
535     }
536 
537     /**
538      * Handles {@link Http2Exception} objects that were thrown from other handlers. Ignores all other exceptions.
539      */
540     @Override
541     public void channelExceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
542         if (getEmbeddedHttp2Exception(cause) != null) {
543             // Some exception in the causality chain is an Http2Exception - handle it.
544             onError(ctx, false, cause);
545         } else {
546             super.channelExceptionCaught(ctx, cause);
547         }
548     }
549 
550     /**
551      * Closes the local side of the given stream. If this causes the stream to be closed, adds a
552      * hook to close the channel after the given future completes.
553      *
554      * @param stream the stream to be half closed.
555      * @param future If closing, the future after which to close the channel.
556      */
557     @Override
558     public void closeStreamLocal(Http2Stream stream, Future<Void> future) {
559         switch (stream.state()) {
560             case HALF_CLOSED_LOCAL:
561             case OPEN:
562                 stream.closeLocalSide();
563                 break;
564             default:
565                 closeStream(stream, future);
566                 break;
567         }
568     }
569 
570     /**
571      * Closes the remote side of the given stream. If this causes the stream to be closed, adds a
572      * hook to close the channel after the given future completes.
573      *
574      * @param stream the stream to be half closed.
575      * @param future If closing, the future after which to close the channel.
576      */
577     @Override
578     public void closeStreamRemote(Http2Stream stream, Future<Void> future) {
579         switch (stream.state()) {
580             case HALF_CLOSED_REMOTE:
581             case OPEN:
582                 stream.closeRemoteSide();
583                 break;
584             default:
585                 closeStream(stream, future);
586                 break;
587         }
588     }
589 
590     @Override
591     public void closeStream(final Http2Stream stream, Future<Void> future) {
592         stream.close();
593 
594         if (future.isDone()) {
595             checkCloseConnection(future);
596         } else {
597             future.addListener(this::checkCloseConnection);
598         }
599     }
600 
601     /**
602      * Central handler for all exceptions caught during HTTP/2 processing.
603      */
604     @Override
605     public void onError(ChannelHandlerContext ctx, boolean outbound, Throwable cause) {
606         Http2Exception embedded = getEmbeddedHttp2Exception(cause);
607         if (isStreamError(embedded)) {
608             onStreamError(ctx, outbound, cause, (StreamException) embedded);
609         } else if (embedded instanceof CompositeStreamException) {
610             CompositeStreamException compositException = (CompositeStreamException) embedded;
611             for (StreamException streamException : compositException) {
612                 onStreamError(ctx, outbound, cause, streamException);
613             }
614         } else {
615             onConnectionError(ctx, outbound, cause, embedded);
616         }
617         ctx.flush();
618     }
619 
620     /**
621      * Called by the graceful shutdown logic to determine when it is safe to close the connection. Returns {@code true}
622      * if the graceful shutdown has completed and the connection can be safely closed. This implementation just
623      * guarantees that there are no active streams. Subclasses may override to provide additional checks.
624      */
625     protected boolean isGracefulShutdownComplete() {
626         return connection().numActiveStreams() == 0;
627     }
628 
629     /**
630      * Handler for a connection error. Sends a GO_AWAY frame to the remote endpoint. Once all
631      * streams are closed, the connection is shut down.
632      *
633      * @param ctx the channel context
634      * @param outbound {@code true} if the error was caused by an outbound operation.
635      * @param cause the exception that was caught
636      * @param http2Ex the {@link Http2Exception} that is embedded in the causality chain. This may
637      *            be {@code null} if it's an unknown exception.
638      */
639     protected void onConnectionError(ChannelHandlerContext ctx, boolean outbound,
640                                      Throwable cause, Http2Exception http2Ex) {
641         if (http2Ex == null) {
642             http2Ex = new Http2Exception(INTERNAL_ERROR, cause.getMessage(), cause);
643         }
644 
645         Promise<Void> promise = ctx.newPromise();
646         Future<Void> future = goAway(ctx, http2Ex);
647         if (http2Ex.shutdownHint() == Http2Exception.ShutdownHint.GRACEFUL_SHUTDOWN) {
648             doGracefulShutdown(ctx, future, promise);
649         } else {
650             future.addListener(newClosingChannelFutureListener(ctx, promise));
651         }
652     }
653 
654     /**
655      * Handler for a stream error. Sends a {@code RST_STREAM} frame to the remote endpoint and closes the
656      * stream.
657      *
658      * @param ctx the channel context
659      * @param outbound {@code true} if the error was caused by an outbound operation.
660      * @param cause the exception that was caught
661      * @param http2Ex the {@link StreamException} that is embedded in the causality chain.
662      */
663     protected void onStreamError(ChannelHandlerContext ctx, boolean outbound,
664                                  @SuppressWarnings("unused") Throwable cause, StreamException http2Ex) {
665         final int streamId = http2Ex.streamId();
666         Http2Stream stream = connection().stream(streamId);
667 
668         //if this is caused by reading headers that are too large, send a header with status 431
669         if (http2Ex instanceof Http2Exception.HeaderListSizeException &&
670             ((Http2Exception.HeaderListSizeException) http2Ex).duringDecode() &&
671             connection().isServer()) {
672 
673             // NOTE We have to check to make sure that a stream exists before we send our reply.
674             // We likely always create the stream below as the stream isn't created until the
675             // header block is completely processed.
676 
677             // The case of a streamId referring to a stream which was already closed is handled
678             // by createStream and will land us in the catch block below
679             if (stream == null) {
680                 try {
681                     stream = encoder.connection().remote().createStream(streamId, true);
682                 } catch (Http2Exception e) {
683                     resetUnknownStream(ctx, streamId, http2Ex.error().code());
684                     return;
685                 }
686             }
687 
688             // ensure that we have not already sent headers on this stream
689             if (stream != null && !stream.isHeadersSent()) {
690                 try {
691                     handleServerHeaderDecodeSizeError(ctx, stream);
692                 } catch (Throwable cause2) {
693                     onError(ctx, outbound, connectionError(INTERNAL_ERROR, cause2, "Error DecodeSizeError"));
694                 }
695             }
696         }
697 
698         if (stream == null) {
699             if (!outbound || connection().local().mayHaveCreatedStream(streamId)) {
700                 resetUnknownStream(ctx, streamId, http2Ex.error().code());
701             }
702         } else {
703             resetStream(ctx, stream, http2Ex.error().code());
704         }
705     }
706 
707     /**
708      * Notifies client that this server has received headers that are larger than what it is
709      * willing to accept. Override to change behavior.
710      *
711      * @param ctx the channel context
712      * @param stream the Http2Stream on which the header was received
713      */
714     protected void handleServerHeaderDecodeSizeError(ChannelHandlerContext ctx, Http2Stream stream) {
715         encoder().writeHeaders(ctx, stream.id(), HEADERS_TOO_LARGE_HEADERS, 0, true);
716     }
717 
718     protected Http2FrameWriter frameWriter() {
719         return encoder().frameWriter();
720     }
721 
722     /**
723      * Sends a {@code RST_STREAM} frame even if we don't know about the stream. This error condition is most likely
724      * triggered by the first frame of a stream being invalid. That is, there was an error reading the frame before
725      * we could create a new stream.
726      */
727     private Future<Void> resetUnknownStream(final ChannelHandlerContext ctx, int streamId, long errorCode) {
728         Future<Void> future = frameWriter().writeRstStream(ctx, streamId, errorCode);
729         if (future.isDone()) {
730             closeConnectionOnError(ctx, future);
731         } else {
732             future.addListener(ctx, this::closeConnectionOnError);
733         }
734         return future;
735     }
736 
737     @Override
738     public Future<Void> resetStream(final ChannelHandlerContext ctx, int streamId, long errorCode) {
739         final Http2Stream stream = connection().stream(streamId);
740         if (stream == null) {
741             return resetUnknownStream(ctx, streamId, errorCode);
742         }
743 
744        return resetStream(ctx, stream, errorCode);
745     }
746 
747     private Future<Void> resetStream(final ChannelHandlerContext ctx, final Http2Stream stream,
748                                       long errorCode) {
749         if (stream.isResetSent()) {
750             // Don't write a RST_STREAM frame if we have already written one.
751             return ctx.newSucceededFuture();
752         }
753         // Synchronously set the resetSent flag to prevent any subsequent calls
754         // from resulting in multiple reset frames being sent.
755         //
756         // This needs to be done before we notify the promise as the promise may have a listener attached that
757         // call resetStream(...) again.
758         stream.resetSent();
759 
760         final Future<Void> future;
761         // If the remote peer is not aware of the steam, then we are not allowed to send a RST_STREAM
762         // https://tools.ietf.org/html/rfc7540#section-6.4.
763         if (stream.state() == IDLE ||
764             connection().local().created(stream) && !stream.isHeadersSent() && !stream.isPushPromiseSent()) {
765             future = ctx.newSucceededFuture();
766         } else {
767             future = frameWriter().writeRstStream(ctx, stream.id(), errorCode);
768         }
769         if (future.isDone()) {
770             processRstStreamWriteResult(ctx, stream, future);
771         } else {
772             future.addListener(future1 -> processRstStreamWriteResult(ctx, stream, future1));
773         }
774 
775         return future;
776     }
777 
778     @Override
779     public Future<Void> goAway(final ChannelHandlerContext ctx, final int lastStreamId, final long errorCode,
780                                 final Buffer debugData) {
781         final Http2Connection connection = connection();
782         try {
783             if (!connection.goAwaySent(lastStreamId, errorCode, debugData)) {
784                 debugData.close();
785                 return ctx.newSucceededFuture();
786             }
787         } catch (Throwable cause) {
788             debugData.close();
789             return ctx.newFailedFuture(cause);
790         }
791 
792         // Need to retain a shallow-copy before we write the buffer because we may need it later for debug-logging,
793         // and the debugData buffer will be freed by the go-away write.
794         debugData.makeReadOnly();
795         Buffer debugDataCopy = debugData.copy(true);
796         Future<Void> future = frameWriter().writeGoAway(ctx, lastStreamId, errorCode, debugData);
797 
798         if (future.isDone()) {
799             processGoAwayWriteResult(ctx, lastStreamId, errorCode, debugDataCopy, future);
800         } else {
801             future.addListener(future1 ->
802                     processGoAwayWriteResult(ctx, lastStreamId, errorCode, debugDataCopy, future1));
803         }
804 
805         return future;
806     }
807 
808     /**
809      * Closes the connection if the graceful shutdown process has completed.
810      * @param future Represents the status that will be passed to the {@link #closeListener}.
811      */
812     private void checkCloseConnection(Future<?> future) {
813         // If this connection is closing and the graceful shutdown has completed, close the connection
814         // once this operation completes.
815         if (closeListener != null && isGracefulShutdownComplete()) {
816             FutureListener<Object> closeListener = this.closeListener;
817             // This method could be called multiple times
818             // and we don't want to notify the closeListener multiple times.
819             this.closeListener = null;
820             try {
821                 closeListener.operationComplete(future);
822             } catch (Exception e) {
823                 throw new IllegalStateException("Close listener threw an unexpected exception", e);
824             }
825         }
826     }
827 
828     /**
829      * Close the remote endpoint with a {@code GO_AWAY} frame. Does <strong>not</strong> flush
830      * immediately, this is the responsibility of the caller.
831      */
832     private Future<Void> goAway(ChannelHandlerContext ctx, Http2Exception cause) {
833         long errorCode = cause != null ? cause.error().code() : NO_ERROR.code();
834         int lastKnownStream;
835         if (cause != null && cause.shutdownHint() == Http2Exception.ShutdownHint.HARD_SHUTDOWN) {
836             // The hard shutdown could have been triggered during header processing, before updating
837             // lastStreamCreated(). Specifically, any connection errors encountered by Http2FrameReader or HPACK
838             // decoding will fail to update the last known stream. So we must be pessimistic.
839             // https://github.com/netty/netty/issues/10670
840             lastKnownStream = Integer.MAX_VALUE;
841         } else {
842             lastKnownStream = connection().remote().lastStreamCreated();
843         }
844         return goAway(ctx, lastKnownStream, errorCode, Http2CodecUtil.toBuffer(ctx, cause));
845     }
846 
847     @SuppressWarnings("unchecked")
848     private void processRstStreamWriteResult(ChannelHandlerContext ctx, Http2Stream stream, Future<?> future) {
849         if (future.isSuccess()) {
850             closeStream(stream, (Future<Void>) future);
851         } else {
852             // The connection will be closed and so no need to change the resetSent flag to false.
853             onConnectionError(ctx, true, future.cause(), null);
854         }
855     }
856 
857     private void closeConnectionOnError(ChannelHandlerContext ctx, Future<?> future) {
858         if (future.isFailed()) {
859             onConnectionError(ctx, true, future.cause(), null);
860         }
861     }
862 
863     /**
864      * Returns the client preface string if this is a client connection, otherwise returns {@code null}.
865      */
866     private static Buffer clientPrefaceString(Http2Connection connection) {
867         return connection.isServer() ? connectionPrefaceBuffer() : null;
868     }
869 
870     private static void processGoAwayWriteResult(final ChannelHandlerContext ctx, final int lastStreamId,
871                                                  final long errorCode, final Buffer debugData, Future<?> future) {
872         try (debugData) {
873             if (future.isSuccess()) {
874                 if (errorCode != NO_ERROR.code()) {
875                     if (logger.isDebugEnabled()) {
876                         logger.debug("{} Sent GOAWAY: lastStreamId '{}', errorCode '{}', " +
877                                      "debugData '{}'. Forcing shutdown of the connection.",
878                                      ctx.channel(), lastStreamId, errorCode, debugData.toString(UTF_8), future.cause());
879                     }
880                     ctx.close();
881                 }
882             } else {
883                 if (logger.isDebugEnabled()) {
884                     logger.debug("{} Sending GOAWAY failed: lastStreamId '{}', errorCode '{}', " +
885                                  "debugData '{}'. Forcing shutdown of the connection.",
886                                  ctx.channel(), lastStreamId, errorCode, debugData.toString(UTF_8), future.cause());
887                 }
888                 ctx.close();
889             }
890         }
891     }
892 
893     /**
894      * Closes the channel when the future completes.
895      */
896     private static final class ClosingChannelFutureListener implements FutureListener<Object> {
897         private final ChannelHandlerContext ctx;
898         private final Promise<Void> promise;
899         private final Future<?> timeoutTask;
900         private boolean closed;
901 
902         ClosingChannelFutureListener(ChannelHandlerContext ctx, Promise<Void> promise) {
903             this.ctx = ctx;
904             this.promise = promise;
905             timeoutTask = null;
906         }
907 
908         ClosingChannelFutureListener(final ChannelHandlerContext ctx, final Promise<Void> promise,
909                                      long timeout, TimeUnit unit) {
910             this.ctx = ctx;
911             this.promise = promise;
912             timeoutTask = ctx.executor().schedule(this::doClose, timeout, unit);
913         }
914 
915         @Override
916         public void operationComplete(Future<?> sentGoAwayFuture) {
917             if (timeoutTask != null) {
918                 timeoutTask.cancel();
919             }
920             doClose();
921         }
922 
923         private void doClose() {
924             // We need to guard against multiple calls as the timeout may trigger close() first and then it will be
925             // triggered again because of operationComplete(...) is called.
926             if (closed) {
927                 // This only happens if we also scheduled a timeout task.
928                 assert timeoutTask != null;
929                 return;
930             }
931             closed = true;
932             // Before trying to close we need to check if the handler still exists in the pipeline as it may
933             // have been removed already.
934             if (!ctx.isRemoved()) {
935                 if (promise == null) {
936                     ctx.close();
937                 } else {
938                     ctx.close().cascadeTo(promise);
939                 }
940             } else if (promise != null) {
941                 promise.setSuccess(null);
942             }
943         }
944     }
945 }