View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License, version 2.0 (the
5    * "License"); you may not use this file except in compliance with the License. You may obtain a
6    * copy of the License at:
7    *
8    * https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software distributed under the License
11   * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12   * or implied. See the License for the specific language governing permissions and limitations under
13   * the License.
14   */
15  package io.netty.handler.codec.http2;
16  
17  import io.netty.buffer.ByteBuf;
18  import io.netty.buffer.ByteBufUtil;
19  import io.netty.buffer.Unpooled;
20  import io.netty.channel.ChannelFuture;
21  import io.netty.channel.ChannelFutureListener;
22  import io.netty.channel.ChannelHandlerContext;
23  import io.netty.channel.ChannelOutboundHandler;
24  import io.netty.channel.ChannelPromise;
25  import io.netty.handler.codec.ByteToMessageDecoder;
26  import io.netty.handler.codec.http.HttpResponseStatus;
27  import io.netty.handler.codec.http2.Http2Exception.CompositeStreamException;
28  import io.netty.handler.codec.http2.Http2Exception.StreamException;
29  import io.netty.util.CharsetUtil;
30  import io.netty.util.concurrent.Future;
31  import io.netty.util.internal.UnstableApi;
32  import io.netty.util.internal.logging.InternalLogger;
33  import io.netty.util.internal.logging.InternalLoggerFactory;
34  
35  import java.net.SocketAddress;
36  import java.util.List;
37  import java.util.concurrent.TimeUnit;
38  
39  import static io.netty.buffer.ByteBufUtil.hexDump;
40  import static io.netty.buffer.Unpooled.EMPTY_BUFFER;
41  import static io.netty.handler.codec.http2.Http2CodecUtil.HTTP_UPGRADE_STREAM_ID;
42  import static io.netty.handler.codec.http2.Http2CodecUtil.connectionPrefaceBuf;
43  import static io.netty.handler.codec.http2.Http2CodecUtil.getEmbeddedHttp2Exception;
44  import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
45  import static io.netty.handler.codec.http2.Http2Error.NO_ERROR;
46  import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
47  import static io.netty.handler.codec.http2.Http2Exception.connectionError;
48  import static io.netty.handler.codec.http2.Http2Exception.isStreamError;
49  import static io.netty.handler.codec.http2.Http2FrameTypes.SETTINGS;
50  import static io.netty.handler.codec.http2.Http2Stream.State.IDLE;
51  import static io.netty.util.CharsetUtil.UTF_8;
52  import static io.netty.util.internal.ObjectUtil.checkNotNull;
53  import static java.lang.Math.min;
54  import static java.util.concurrent.TimeUnit.MILLISECONDS;
55  
56  /**
57   * Provides the default implementation for processing inbound frame events and delegates to a
58   * {@link Http2FrameListener}
59   * <p>
60   * This class will read HTTP/2 frames and delegate the events to a {@link Http2FrameListener}
61   * <p>
62   * This interface enforces inbound flow control functionality through
63   * {@link Http2LocalFlowController}
64   */
65  @UnstableApi
66  public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http2LifecycleManager,
67                                                                              ChannelOutboundHandler {
68  
69      private static final InternalLogger logger = InternalLoggerFactory.getInstance(Http2ConnectionHandler.class);
70  
71      private static final Http2Headers HEADERS_TOO_LARGE_HEADERS = ReadOnlyHttp2Headers.serverHeaders(false,
72              HttpResponseStatus.REQUEST_HEADER_FIELDS_TOO_LARGE.codeAsText());
73      private static final ByteBuf HTTP_1_X_BUF = Unpooled.unreleasableBuffer(
74          Unpooled.wrappedBuffer(new byte[] {'H', 'T', 'T', 'P', '/', '1', '.'})).asReadOnly();
75  
76      private final Http2ConnectionDecoder decoder;
77      private final Http2ConnectionEncoder encoder;
78      private final Http2Settings initialSettings;
79      private final boolean decoupleCloseAndGoAway;
80      private final boolean flushPreface;
81      private ChannelFutureListener closeListener;
82      private BaseDecoder byteDecoder;
83      private long gracefulShutdownTimeoutMillis;
84  
85      protected Http2ConnectionHandler(Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder,
86                                       Http2Settings initialSettings) {
87          this(decoder, encoder, initialSettings, false);
88      }
89  
90      protected Http2ConnectionHandler(Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder,
91                                       Http2Settings initialSettings, boolean decoupleCloseAndGoAway) {
92          this(decoder, encoder, initialSettings, decoupleCloseAndGoAway, true);
93      }
94  
95      protected Http2ConnectionHandler(Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder,
96                                       Http2Settings initialSettings, boolean decoupleCloseAndGoAway,
97                                       boolean flushPreface) {
98          this.initialSettings = checkNotNull(initialSettings, "initialSettings");
99          this.decoder = checkNotNull(decoder, "decoder");
100         this.encoder = checkNotNull(encoder, "encoder");
101         this.decoupleCloseAndGoAway = decoupleCloseAndGoAway;
102         this.flushPreface = flushPreface;
103         if (encoder.connection() != decoder.connection()) {
104             throw new IllegalArgumentException("Encoder and Decoder do not share the same connection object");
105         }
106     }
107 
108     /**
109      * Get the amount of time (in milliseconds) this endpoint will wait for all streams to be closed before closing
110      * the connection during the graceful shutdown process. Returns -1 if this connection is configured to wait
111      * indefinitely for all streams to close.
112      */
113     public long gracefulShutdownTimeoutMillis() {
114         return gracefulShutdownTimeoutMillis;
115     }
116 
117     /**
118      * Set the amount of time (in milliseconds) this endpoint will wait for all streams to be closed before closing
119      * the connection during the graceful shutdown process.
120      * @param gracefulShutdownTimeoutMillis the amount of time (in milliseconds) this endpoint will wait for all
121      * streams to be closed before closing the connection during the graceful shutdown process.
122      */
123     public void gracefulShutdownTimeoutMillis(long gracefulShutdownTimeoutMillis) {
124         if (gracefulShutdownTimeoutMillis < -1) {
125             throw new IllegalArgumentException("gracefulShutdownTimeoutMillis: " + gracefulShutdownTimeoutMillis +
126                                                " (expected: -1 for indefinite or >= 0)");
127         }
128         this.gracefulShutdownTimeoutMillis = gracefulShutdownTimeoutMillis;
129     }
130 
131     public Http2Connection connection() {
132         return encoder.connection();
133     }
134 
135     public Http2ConnectionDecoder decoder() {
136         return decoder;
137     }
138 
139     public Http2ConnectionEncoder encoder() {
140         return encoder;
141     }
142 
143     private boolean prefaceSent() {
144         return byteDecoder != null && byteDecoder.prefaceSent();
145     }
146 
147     /**
148      * Handles the client-side (cleartext) upgrade from HTTP to HTTP/2.
149      * Reserves local stream 1 for the HTTP/2 response.
150      */
151     public void onHttpClientUpgrade() throws Http2Exception {
152         if (connection().isServer()) {
153             throw connectionError(PROTOCOL_ERROR, "Client-side HTTP upgrade requested for a server");
154         }
155         if (!prefaceSent()) {
156             // If the preface was not sent yet it most likely means the handler was not added to the pipeline before
157             // calling this method.
158             throw connectionError(INTERNAL_ERROR, "HTTP upgrade must occur after preface was sent");
159         }
160         if (decoder.prefaceReceived()) {
161             throw connectionError(PROTOCOL_ERROR, "HTTP upgrade must occur before HTTP/2 preface is received");
162         }
163 
164         // Create a local stream used for the HTTP cleartext upgrade.
165         connection().local().createStream(HTTP_UPGRADE_STREAM_ID, true);
166     }
167 
168     /**
169      * Handles the server-side (cleartext) upgrade from HTTP to HTTP/2.
170      * @param settings the settings for the remote endpoint.
171      */
172     public void onHttpServerUpgrade(Http2Settings settings) throws Http2Exception {
173         if (!connection().isServer()) {
174             throw connectionError(PROTOCOL_ERROR, "Server-side HTTP upgrade requested for a client");
175         }
176         if (!prefaceSent()) {
177             // If the preface was not sent yet it most likely means the handler was not added to the pipeline before
178             // calling this method.
179             throw connectionError(INTERNAL_ERROR, "HTTP upgrade must occur after preface was sent");
180         }
181         if (decoder.prefaceReceived()) {
182             throw connectionError(PROTOCOL_ERROR, "HTTP upgrade must occur before HTTP/2 preface is received");
183         }
184 
185         // Apply the settings but no ACK is necessary.
186         encoder.remoteSettings(settings);
187 
188         // Create a stream in the half-closed state.
189         connection().remote().createStream(HTTP_UPGRADE_STREAM_ID, true);
190     }
191 
192     @Override
193     public void flush(ChannelHandlerContext ctx) {
194         try {
195             // Trigger pending writes in the remote flow controller.
196             encoder.flowController().writePendingBytes();
197             ctx.flush();
198         } catch (Http2Exception e) {
199             onError(ctx, true, e);
200         } catch (Throwable cause) {
201             onError(ctx, true, connectionError(INTERNAL_ERROR, cause, "Error flushing"));
202         }
203     }
204 
205     private abstract class BaseDecoder {
206         public abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;
207         public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { }
208         public void channelActive(ChannelHandlerContext ctx) throws Exception { }
209 
210         public void channelInactive(ChannelHandlerContext ctx) throws Exception {
211             // Connection has terminated, close the encoder and decoder.
212             encoder().close();
213             decoder().close();
214 
215             // We need to remove all streams (not just the active ones).
216             // See https://github.com/netty/netty/issues/4838.
217             connection().close(ctx.voidPromise());
218         }
219 
220         /**
221          * Determine if the HTTP/2 connection preface been sent.
222          */
223         public boolean prefaceSent() {
224             return true;
225         }
226     }
227 
228     private final class PrefaceDecoder extends BaseDecoder {
229         private ByteBuf clientPrefaceString;
230         private boolean prefaceSent;
231 
232         PrefaceDecoder(ChannelHandlerContext ctx) throws Exception {
233             clientPrefaceString = clientPrefaceString(encoder.connection());
234             // This handler was just added to the context. In case it was handled after
235             // the connection became active, send the connection preface now.
236             sendPreface(ctx);
237         }
238 
239         @Override
240         public boolean prefaceSent() {
241             return prefaceSent;
242         }
243 
244         @Override
245         public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
246             try {
247                 if (ctx.channel().isActive() && readClientPrefaceString(in) && verifyFirstFrameIsSettings(in)) {
248                     // After the preface is read, it is time to hand over control to the post initialized decoder.
249                     byteDecoder = new FrameDecoder();
250                     byteDecoder.decode(ctx, in, out);
251                 }
252             } catch (Throwable e) {
253                 onError(ctx, false, e);
254             }
255         }
256 
257         @Override
258         public void channelActive(ChannelHandlerContext ctx) throws Exception {
259             // The channel just became active - send the connection preface to the remote endpoint.
260             sendPreface(ctx);
261 
262             if (flushPreface) {
263                 // As we don't know if any channelReadComplete() events will be triggered at all we need to ensure we
264                 // also flush. Otherwise the remote peer might never see the preface / settings frame.
265                 // See https://github.com/netty/netty/issues/12089
266                 ctx.flush();
267             }
268         }
269 
270         @Override
271         public void channelInactive(ChannelHandlerContext ctx) throws Exception {
272             cleanup();
273             super.channelInactive(ctx);
274         }
275 
276         /**
277          * Releases the {@code clientPrefaceString}. Any active streams will be left in the open.
278          */
279         @Override
280         public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
281             cleanup();
282         }
283 
284         /**
285          * Releases the {@code clientPrefaceString}. Any active streams will be left in the open.
286          */
287         private void cleanup() {
288             if (clientPrefaceString != null) {
289                 clientPrefaceString.release();
290                 clientPrefaceString = null;
291             }
292         }
293 
294         /**
295          * Decodes the client connection preface string from the input buffer.
296          *
297          * @return {@code true} if processing of the client preface string is complete. Since client preface strings can
298          *         only be received by servers, returns true immediately for client endpoints.
299          */
300         private boolean readClientPrefaceString(ByteBuf in) throws Http2Exception {
301             if (clientPrefaceString == null) {
302                 return true;
303             }
304 
305             int prefaceRemaining = clientPrefaceString.readableBytes();
306             int bytesRead = min(in.readableBytes(), prefaceRemaining);
307 
308             // If the input so far doesn't match the preface, break the connection.
309             if (bytesRead == 0 || !ByteBufUtil.equals(in, in.readerIndex(),
310                                                       clientPrefaceString, clientPrefaceString.readerIndex(),
311                                                       bytesRead)) {
312                 int maxSearch = 1024; // picked because 512 is too little, and 2048 too much
313                 int http1Index =
314                     ByteBufUtil.indexOf(HTTP_1_X_BUF, in.slice(in.readerIndex(), min(in.readableBytes(), maxSearch)));
315                 if (http1Index != -1) {
316                     String chunk = in.toString(in.readerIndex(), http1Index - in.readerIndex(), CharsetUtil.US_ASCII);
317                     throw connectionError(PROTOCOL_ERROR, "Unexpected HTTP/1.x request: %s", chunk);
318                 }
319                 String receivedBytes = hexDump(in, in.readerIndex(),
320                                                min(in.readableBytes(), clientPrefaceString.readableBytes()));
321                 throw connectionError(PROTOCOL_ERROR, "HTTP/2 client preface string missing or corrupt. " +
322                                                       "Hex dump for received bytes: %s", receivedBytes);
323             }
324             in.skipBytes(bytesRead);
325             clientPrefaceString.skipBytes(bytesRead);
326 
327             if (!clientPrefaceString.isReadable()) {
328                 // Entire preface has been read.
329                 clientPrefaceString.release();
330                 clientPrefaceString = null;
331                 return true;
332             }
333             return false;
334         }
335 
336         /**
337          * Peeks at that the next frame in the buffer and verifies that it is a non-ack {@code SETTINGS} frame.
338          *
339          * @param in the inbound buffer.
340          * @return {@code true} if the next frame is a non-ack {@code SETTINGS} frame, {@code false} if more
341          * data is required before we can determine the next frame type.
342          * @throws Http2Exception thrown if the next frame is NOT a non-ack {@code SETTINGS} frame.
343          */
344         private boolean verifyFirstFrameIsSettings(ByteBuf in) throws Http2Exception {
345             if (in.readableBytes() < 5) {
346                 // Need more data before we can see the frame type for the first frame.
347                 return false;
348             }
349 
350             short frameType = in.getUnsignedByte(in.readerIndex() + 3);
351             short flags = in.getUnsignedByte(in.readerIndex() + 4);
352             if (frameType != SETTINGS || (flags & Http2Flags.ACK) != 0) {
353                 throw connectionError(PROTOCOL_ERROR, "First received frame was not SETTINGS. " +
354                                                       "Hex dump for first 5 bytes: %s",
355                                       hexDump(in, in.readerIndex(), 5));
356             }
357             return true;
358         }
359 
360         /**
361          * Sends the HTTP/2 connection preface upon establishment of the connection, if not already sent.
362          */
363         private void sendPreface(ChannelHandlerContext ctx) throws Exception {
364             if (prefaceSent || !ctx.channel().isActive()) {
365                 return;
366             }
367 
368             prefaceSent = true;
369 
370             final boolean isClient = !connection().isServer();
371             if (isClient) {
372                 // Clients must send the preface string as the first bytes on the connection.
373                 ctx.write(connectionPrefaceBuf()).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
374             }
375 
376             // Both client and server must send their initial settings.
377             encoder.writeSettings(ctx, initialSettings, ctx.newPromise()).addListener(
378                     ChannelFutureListener.CLOSE_ON_FAILURE);
379 
380             if (isClient) {
381                 // If this handler is extended by the user and we directly fire the userEvent from this context then
382                 // the user will not see the event. We should fire the event starting with this handler so this class
383                 // (and extending classes) have a chance to process the event.
384                 userEventTriggered(ctx, Http2ConnectionPrefaceAndSettingsFrameWrittenEvent.INSTANCE);
385             }
386         }
387     }
388 
389     private final class FrameDecoder extends BaseDecoder {
390         @Override
391         public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
392             try {
393                 decoder.decodeFrame(ctx, in, out);
394             } catch (Throwable e) {
395                 onError(ctx, false, e);
396             }
397         }
398     }
399 
400     @Override
401     public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
402         // Initialize the encoder, decoder, flow controllers, and internal state.
403         encoder.lifecycleManager(this);
404         decoder.lifecycleManager(this);
405         encoder.flowController().channelHandlerContext(ctx);
406         decoder.flowController().channelHandlerContext(ctx);
407         byteDecoder = new PrefaceDecoder(ctx);
408     }
409 
410     @Override
411     protected void handlerRemoved0(ChannelHandlerContext ctx) throws Exception {
412         if (byteDecoder != null) {
413             byteDecoder.handlerRemoved(ctx);
414             byteDecoder = null;
415         }
416     }
417 
418     @Override
419     public void channelActive(ChannelHandlerContext ctx) throws Exception {
420         if (byteDecoder == null) {
421             byteDecoder = new PrefaceDecoder(ctx);
422         }
423         byteDecoder.channelActive(ctx);
424         super.channelActive(ctx);
425     }
426 
427     @Override
428     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
429         // Call super class first, as this may result in decode being called.
430         super.channelInactive(ctx);
431         if (byteDecoder != null) {
432             byteDecoder.channelInactive(ctx);
433             byteDecoder = null;
434         }
435     }
436 
437     @Override
438     public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
439         // Writability is expected to change while we are writing. We cannot allow this event to trigger reentering
440         // the allocation and write loop. Reentering the event loop will lead to over or illegal allocation.
441         try {
442             if (ctx.channel().isWritable()) {
443                 flush(ctx);
444             }
445             encoder.flowController().channelWritabilityChanged();
446         } finally {
447             super.channelWritabilityChanged(ctx);
448         }
449     }
450 
451     @Override
452     protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
453         byteDecoder.decode(ctx, in, out);
454     }
455 
456     @Override
457     public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
458         ctx.bind(localAddress, promise);
459     }
460 
461     @Override
462     public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress,
463                         ChannelPromise promise) throws Exception {
464         ctx.connect(remoteAddress, localAddress, promise);
465     }
466 
467     @Override
468     public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
469         ctx.disconnect(promise);
470     }
471 
472     @Override
473     public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
474         if (decoupleCloseAndGoAway) {
475             ctx.close(promise);
476             return;
477         }
478         promise = promise.unvoid();
479         // Avoid NotYetConnectedException and avoid sending before connection preface
480         if (!ctx.channel().isActive() || !prefaceSent()) {
481             ctx.close(promise);
482             return;
483         }
484 
485         // If the user has already sent a GO_AWAY frame they may be attempting to do a graceful shutdown which requires
486         // sending multiple GO_AWAY frames. We should only send a GO_AWAY here if one has not already been sent. If
487         // a GO_AWAY has been sent we send a empty buffer just so we can wait to close until all other data has been
488         // flushed to the OS.
489         // https://github.com/netty/netty/issues/5307
490         ChannelFuture f = connection().goAwaySent() ? ctx.write(EMPTY_BUFFER) : goAway(ctx, null, ctx.newPromise());
491         ctx.flush();
492         doGracefulShutdown(ctx, f, promise);
493     }
494 
495     private ChannelFutureListener newClosingChannelFutureListener(
496             ChannelHandlerContext ctx, ChannelPromise promise) {
497         long gracefulShutdownTimeoutMillis = this.gracefulShutdownTimeoutMillis;
498         return gracefulShutdownTimeoutMillis < 0 ?
499                 new ClosingChannelFutureListener(ctx, promise) :
500                 new ClosingChannelFutureListener(ctx, promise, gracefulShutdownTimeoutMillis, MILLISECONDS);
501     }
502 
503     private void doGracefulShutdown(ChannelHandlerContext ctx, ChannelFuture future, final ChannelPromise promise) {
504         final ChannelFutureListener listener = newClosingChannelFutureListener(ctx, promise);
505         if (isGracefulShutdownComplete()) {
506             // If there are no active streams, close immediately after the GO_AWAY write completes or the timeout
507             // elapsed.
508             future.addListener(listener);
509         } else {
510             // If there are active streams we should wait until they are all closed before closing the connection.
511 
512             // The ClosingChannelFutureListener will cascade promise completion. We need to always notify the
513             // new ClosingChannelFutureListener when the graceful close completes if the promise is not null.
514             if (closeListener == null) {
515                 closeListener = listener;
516             } else if (promise != null) {
517                 final ChannelFutureListener oldCloseListener = closeListener;
518                 closeListener = new ChannelFutureListener() {
519                     @Override
520                     public void operationComplete(ChannelFuture future) throws Exception {
521                         try {
522                             oldCloseListener.operationComplete(future);
523                         } finally {
524                             listener.operationComplete(future);
525                         }
526                     }
527                 };
528             }
529         }
530     }
531 
532     @Override
533     public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
534         ctx.deregister(promise);
535     }
536 
537     @Override
538     public void read(ChannelHandlerContext ctx) throws Exception {
539         ctx.read();
540     }
541 
542     @Override
543     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
544         ctx.write(msg, promise);
545     }
546 
547     @Override
548     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
549         // Trigger flush after read on the assumption that flush is cheap if there is nothing to write and that
550         // for flow-control the read may release window that causes data to be written that can now be flushed.
551         try {
552             // First call channelReadComplete0(...) as this may produce more data that we want to flush
553             channelReadComplete0(ctx);
554         } finally {
555             flush(ctx);
556         }
557     }
558 
559     final void channelReadComplete0(ChannelHandlerContext ctx) {
560         // Discard bytes of the cumulation buffer if needed.
561         discardSomeReadBytes();
562 
563         // Ensure we never stale the HTTP/2 Channel. Flow-control is enforced by HTTP/2.
564         //
565         // See https://tools.ietf.org/html/rfc7540#section-5.2.2
566         if (!ctx.channel().config().isAutoRead()) {
567             ctx.read();
568         }
569 
570         ctx.fireChannelReadComplete();
571     }
572 
573     /**
574      * Handles {@link Http2Exception} objects that were thrown from other handlers. Ignores all other exceptions.
575      */
576     @Override
577     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
578         if (getEmbeddedHttp2Exception(cause) != null) {
579             // Some exception in the causality chain is an Http2Exception - handle it.
580             onError(ctx, false, cause);
581         } else {
582             super.exceptionCaught(ctx, cause);
583         }
584     }
585 
586     /**
587      * Closes the local side of the given stream. If this causes the stream to be closed, adds a
588      * hook to close the channel after the given future completes.
589      *
590      * @param stream the stream to be half closed.
591      * @param future If closing, the future after which to close the channel.
592      */
593     @Override
594     public void closeStreamLocal(Http2Stream stream, ChannelFuture future) {
595         switch (stream.state()) {
596             case HALF_CLOSED_LOCAL:
597             case OPEN:
598                 stream.closeLocalSide();
599                 break;
600             default:
601                 closeStream(stream, future);
602                 break;
603         }
604     }
605 
606     /**
607      * Closes the remote side of the given stream. If this causes the stream to be closed, adds a
608      * hook to close the channel after the given future completes.
609      *
610      * @param stream the stream to be half closed.
611      * @param future If closing, the future after which to close the channel.
612      */
613     @Override
614     public void closeStreamRemote(Http2Stream stream, ChannelFuture future) {
615         switch (stream.state()) {
616             case HALF_CLOSED_REMOTE:
617             case OPEN:
618                 stream.closeRemoteSide();
619                 break;
620             default:
621                 closeStream(stream, future);
622                 break;
623         }
624     }
625 
626     @Override
627     public void closeStream(final Http2Stream stream, ChannelFuture future) {
628         stream.close();
629 
630         if (future.isDone()) {
631             checkCloseConnection(future);
632         } else {
633             future.addListener(new ChannelFutureListener() {
634                 @Override
635                 public void operationComplete(ChannelFuture future) throws Exception {
636                     checkCloseConnection(future);
637                 }
638             });
639         }
640     }
641 
642     /**
643      * Central handler for all exceptions caught during HTTP/2 processing.
644      */
645     @Override
646     public void onError(ChannelHandlerContext ctx, boolean outbound, Throwable cause) {
647         Http2Exception embedded = getEmbeddedHttp2Exception(cause);
648         if (isStreamError(embedded)) {
649             onStreamError(ctx, outbound, cause, (StreamException) embedded);
650         } else if (embedded instanceof CompositeStreamException) {
651             CompositeStreamException compositException = (CompositeStreamException) embedded;
652             for (StreamException streamException : compositException) {
653                 onStreamError(ctx, outbound, cause, streamException);
654             }
655         } else {
656             onConnectionError(ctx, outbound, cause, embedded);
657         }
658         ctx.flush();
659     }
660 
661     /**
662      * Called by the graceful shutdown logic to determine when it is safe to close the connection. Returns {@code true}
663      * if the graceful shutdown has completed and the connection can be safely closed. This implementation just
664      * guarantees that there are no active streams. Subclasses may override to provide additional checks.
665      */
666     protected boolean isGracefulShutdownComplete() {
667         return connection().numActiveStreams() == 0;
668     }
669 
670     /**
671      * Handler for a connection error. Sends a GO_AWAY frame to the remote endpoint. Once all
672      * streams are closed, the connection is shut down.
673      *
674      * @param ctx the channel context
675      * @param outbound {@code true} if the error was caused by an outbound operation.
676      * @param cause the exception that was caught
677      * @param http2Ex the {@link Http2Exception} that is embedded in the causality chain. This may
678      *            be {@code null} if it's an unknown exception.
679      */
680     protected void onConnectionError(ChannelHandlerContext ctx, boolean outbound,
681                                      Throwable cause, Http2Exception http2Ex) {
682         if (http2Ex == null) {
683             http2Ex = new Http2Exception(INTERNAL_ERROR, cause.getMessage(), cause);
684         }
685 
686         ChannelPromise promise = ctx.newPromise();
687         ChannelFuture future = goAway(ctx, http2Ex, ctx.newPromise());
688         if (http2Ex.shutdownHint() == Http2Exception.ShutdownHint.GRACEFUL_SHUTDOWN) {
689             doGracefulShutdown(ctx, future, promise);
690         } else {
691             future.addListener(newClosingChannelFutureListener(ctx, promise));
692         }
693     }
694 
695     /**
696      * Handler for a stream error. Sends a {@code RST_STREAM} frame to the remote endpoint and closes the
697      * stream.
698      *
699      * @param ctx the channel context
700      * @param outbound {@code true} if the error was caused by an outbound operation.
701      * @param cause the exception that was caught
702      * @param http2Ex the {@link StreamException} that is embedded in the causality chain.
703      */
704     protected void onStreamError(ChannelHandlerContext ctx, boolean outbound,
705                                  @SuppressWarnings("unused") Throwable cause, StreamException http2Ex) {
706         final int streamId = http2Ex.streamId();
707         Http2Stream stream = connection().stream(streamId);
708 
709         //if this is caused by reading headers that are too large, send a header with status 431
710         if (http2Ex instanceof Http2Exception.HeaderListSizeException &&
711             ((Http2Exception.HeaderListSizeException) http2Ex).duringDecode() &&
712             connection().isServer()) {
713 
714             // NOTE We have to check to make sure that a stream exists before we send our reply.
715             // We likely always create the stream below as the stream isn't created until the
716             // header block is completely processed.
717 
718             // The case of a streamId referring to a stream which was already closed is handled
719             // by createStream and will land us in the catch block below
720             if (stream == null) {
721                 try {
722                     stream = encoder.connection().remote().createStream(streamId, true);
723                 } catch (Http2Exception e) {
724                     resetUnknownStream(ctx, streamId, http2Ex.error().code(), ctx.newPromise());
725                     return;
726                 }
727             }
728 
729             // ensure that we have not already sent headers on this stream
730             if (stream != null && !stream.isHeadersSent()) {
731                 try {
732                     handleServerHeaderDecodeSizeError(ctx, stream);
733                 } catch (Throwable cause2) {
734                     onError(ctx, outbound, connectionError(INTERNAL_ERROR, cause2, "Error DecodeSizeError"));
735                 }
736             }
737         }
738 
739         if (stream == null) {
740             if (!outbound || connection().local().mayHaveCreatedStream(streamId)) {
741                 resetUnknownStream(ctx, streamId, http2Ex.error().code(), ctx.newPromise());
742             }
743         } else {
744             resetStream(ctx, stream, http2Ex.error().code(), ctx.newPromise());
745         }
746     }
747 
748     /**
749      * Notifies client that this server has received headers that are larger than what it is
750      * willing to accept. Override to change behavior.
751      *
752      * @param ctx the channel context
753      * @param stream the Http2Stream on which the header was received
754      */
755     protected void handleServerHeaderDecodeSizeError(ChannelHandlerContext ctx, Http2Stream stream) {
756         encoder().writeHeaders(ctx, stream.id(), HEADERS_TOO_LARGE_HEADERS, 0, true, ctx.newPromise());
757     }
758 
759     protected Http2FrameWriter frameWriter() {
760         return encoder().frameWriter();
761     }
762 
763     /**
764      * Sends a {@code RST_STREAM} frame even if we don't know about the stream. This error condition is most likely
765      * triggered by the first frame of a stream being invalid. That is, there was an error reading the frame before
766      * we could create a new stream.
767      */
768     private ChannelFuture resetUnknownStream(final ChannelHandlerContext ctx, int streamId, long errorCode,
769                                              ChannelPromise promise) {
770         ChannelFuture future = frameWriter().writeRstStream(ctx, streamId, errorCode, promise);
771         if (future.isDone()) {
772             closeConnectionOnError(ctx, future);
773         } else {
774             future.addListener(new ChannelFutureListener() {
775                 @Override
776                 public void operationComplete(ChannelFuture future) throws Exception {
777                     closeConnectionOnError(ctx, future);
778                 }
779             });
780         }
781         return future;
782     }
783 
784     @Override
785     public ChannelFuture resetStream(final ChannelHandlerContext ctx, int streamId, long errorCode,
786                                      ChannelPromise promise) {
787         final Http2Stream stream = connection().stream(streamId);
788         if (stream == null) {
789             return resetUnknownStream(ctx, streamId, errorCode, promise.unvoid());
790         }
791 
792        return resetStream(ctx, stream, errorCode, promise);
793     }
794 
795     private ChannelFuture resetStream(final ChannelHandlerContext ctx, final Http2Stream stream,
796                                       long errorCode, ChannelPromise promise) {
797         promise = promise.unvoid();
798         if (stream.isResetSent()) {
799             // Don't write a RST_STREAM frame if we have already written one.
800             return promise.setSuccess();
801         }
802         // Synchronously set the resetSent flag to prevent any subsequent calls
803         // from resulting in multiple reset frames being sent.
804         //
805         // This needs to be done before we notify the promise as the promise may have a listener attached that
806         // call resetStream(...) again.
807         stream.resetSent();
808 
809         final ChannelFuture future;
810         // If the remote peer is not aware of the steam, then we are not allowed to send a RST_STREAM
811         // https://tools.ietf.org/html/rfc7540#section-6.4.
812         if (stream.state() == IDLE ||
813             connection().local().created(stream) && !stream.isHeadersSent() && !stream.isPushPromiseSent()) {
814             future = promise.setSuccess();
815         } else {
816             future = frameWriter().writeRstStream(ctx, stream.id(), errorCode, promise);
817         }
818         if (future.isDone()) {
819             processRstStreamWriteResult(ctx, stream, future);
820         } else {
821             future.addListener(new ChannelFutureListener() {
822                 @Override
823                 public void operationComplete(ChannelFuture future) throws Exception {
824                     processRstStreamWriteResult(ctx, stream, future);
825                 }
826             });
827         }
828 
829         return future;
830     }
831 
832     @Override
833     public ChannelFuture goAway(final ChannelHandlerContext ctx, final int lastStreamId, final long errorCode,
834                                 final ByteBuf debugData, ChannelPromise promise) {
835         promise = promise.unvoid();
836         final Http2Connection connection = connection();
837         try {
838             if (!connection.goAwaySent(lastStreamId, errorCode, debugData)) {
839                 debugData.release();
840                 promise.trySuccess();
841                 return promise;
842             }
843         } catch (Throwable cause) {
844             debugData.release();
845             promise.tryFailure(cause);
846             return promise;
847         }
848 
849         // Need to retain before we write the buffer because if we do it after the refCnt could already be 0 and
850         // result in an IllegalRefCountException.
851         debugData.retain();
852         ChannelFuture future = frameWriter().writeGoAway(ctx, lastStreamId, errorCode, debugData, promise);
853 
854         if (future.isDone()) {
855             processGoAwayWriteResult(ctx, lastStreamId, errorCode, debugData, future);
856         } else {
857             future.addListener(new ChannelFutureListener() {
858                 @Override
859                 public void operationComplete(ChannelFuture future) throws Exception {
860                     processGoAwayWriteResult(ctx, lastStreamId, errorCode, debugData, future);
861                 }
862             });
863         }
864 
865         return future;
866     }
867 
868     /**
869      * Closes the connection if the graceful shutdown process has completed.
870      * @param future Represents the status that will be passed to the {@link #closeListener}.
871      */
872     private void checkCloseConnection(ChannelFuture future) {
873         // If this connection is closing and the graceful shutdown has completed, close the connection
874         // once this operation completes.
875         if (closeListener != null && isGracefulShutdownComplete()) {
876             ChannelFutureListener closeListener = this.closeListener;
877             // This method could be called multiple times
878             // and we don't want to notify the closeListener multiple times.
879             this.closeListener = null;
880             try {
881                 closeListener.operationComplete(future);
882             } catch (Exception e) {
883                 throw new IllegalStateException("Close listener threw an unexpected exception", e);
884             }
885         }
886     }
887 
888     /**
889      * Close the remote endpoint with a {@code GO_AWAY} frame. Does <strong>not</strong> flush
890      * immediately, this is the responsibility of the caller.
891      */
892     private ChannelFuture goAway(ChannelHandlerContext ctx, Http2Exception cause, ChannelPromise promise) {
893         long errorCode = cause != null ? cause.error().code() : NO_ERROR.code();
894         int lastKnownStream;
895         if (cause != null && cause.shutdownHint() == Http2Exception.ShutdownHint.HARD_SHUTDOWN) {
896             // The hard shutdown could have been triggered during header processing, before updating
897             // lastStreamCreated(). Specifically, any connection errors encountered by Http2FrameReader or HPACK
898             // decoding will fail to update the last known stream. So we must be pessimistic.
899             // https://github.com/netty/netty/issues/10670
900             lastKnownStream = Integer.MAX_VALUE;
901         } else {
902             lastKnownStream = connection().remote().lastStreamCreated();
903         }
904         return goAway(ctx, lastKnownStream, errorCode, Http2CodecUtil.toByteBuf(ctx, cause), promise);
905     }
906 
907     private void processRstStreamWriteResult(ChannelHandlerContext ctx, Http2Stream stream, ChannelFuture future) {
908         if (future.isSuccess()) {
909             closeStream(stream, future);
910         } else {
911             // The connection will be closed and so no need to change the resetSent flag to false.
912             onConnectionError(ctx, true, future.cause(), null);
913         }
914     }
915 
916     private void closeConnectionOnError(ChannelHandlerContext ctx, ChannelFuture future) {
917         if (!future.isSuccess()) {
918             onConnectionError(ctx, true, future.cause(), null);
919         }
920     }
921 
922     /**
923      * Returns the client preface string if this is a client connection, otherwise returns {@code null}.
924      */
925     private static ByteBuf clientPrefaceString(Http2Connection connection) {
926         return connection.isServer() ? connectionPrefaceBuf() : null;
927     }
928 
929     private static void processGoAwayWriteResult(final ChannelHandlerContext ctx, final int lastStreamId,
930                                                  final long errorCode, final ByteBuf debugData, ChannelFuture future) {
931         try {
932             if (future.isSuccess()) {
933                 if (errorCode != NO_ERROR.code()) {
934                     if (logger.isDebugEnabled()) {
935                         logger.debug("{} Sent GOAWAY: lastStreamId '{}', errorCode '{}', " +
936                                      "debugData '{}'. Forcing shutdown of the connection.",
937                                      ctx.channel(), lastStreamId, errorCode, debugData.toString(UTF_8), future.cause());
938                     }
939                     ctx.close();
940                 }
941             } else {
942                 if (logger.isDebugEnabled()) {
943                     logger.debug("{} Sending GOAWAY failed: lastStreamId '{}', errorCode '{}', " +
944                                  "debugData '{}'. Forcing shutdown of the connection.",
945                                  ctx.channel(), lastStreamId, errorCode, debugData.toString(UTF_8), future.cause());
946                 }
947                 ctx.close();
948             }
949         } finally {
950             // We're done with the debug data now.
951             debugData.release();
952         }
953     }
954 
955     /**
956      * Closes the channel when the future completes.
957      */
958     private static final class ClosingChannelFutureListener implements ChannelFutureListener {
959         private final ChannelHandlerContext ctx;
960         private final ChannelPromise promise;
961         private final Future<?> timeoutTask;
962         private boolean closed;
963 
964         ClosingChannelFutureListener(ChannelHandlerContext ctx, ChannelPromise promise) {
965             this.ctx = ctx;
966             this.promise = promise;
967             timeoutTask = null;
968         }
969 
970         ClosingChannelFutureListener(final ChannelHandlerContext ctx, final ChannelPromise promise,
971                                      long timeout, TimeUnit unit) {
972             this.ctx = ctx;
973             this.promise = promise;
974             timeoutTask = ctx.executor().schedule(new Runnable() {
975                 @Override
976                 public void run() {
977                     doClose();
978                 }
979             }, timeout, unit);
980         }
981 
982         @Override
983         public void operationComplete(ChannelFuture sentGoAwayFuture) {
984             if (timeoutTask != null) {
985                 timeoutTask.cancel(false);
986             }
987             doClose();
988         }
989 
990         private void doClose() {
991             // We need to guard against multiple calls as the timeout may trigger close() first and then it will be
992             // triggered again because of operationComplete(...) is called.
993             if (closed) {
994                 // This only happens if we also scheduled a timeout task.
995                 assert timeoutTask != null;
996                 return;
997             }
998             closed = true;
999             if (promise == null) {
1000                 ctx.close();
1001             } else {
1002                 ctx.close(promise);
1003             }
1004         }
1005     }
1006 }