View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License, version 2.0 (the
5    * "License"); you may not use this file except in compliance with the License. You may obtain a
6    * copy of the License at:
7    *
8    * https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software distributed under the License
11   * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12   * or implied. See the License for the specific language governing permissions and limitations under
13   * the License.
14   */
15  package io.netty.handler.codec.http2;
16  
17  import io.netty.buffer.ByteBuf;
18  import io.netty.buffer.ByteBufUtil;
19  import io.netty.buffer.Unpooled;
20  import io.netty.channel.ChannelFuture;
21  import io.netty.channel.ChannelFutureListener;
22  import io.netty.channel.ChannelHandlerContext;
23  import io.netty.channel.ChannelOutboundHandler;
24  import io.netty.channel.ChannelPromise;
25  import io.netty.handler.codec.ByteToMessageDecoder;
26  import io.netty.handler.codec.http.HttpResponseStatus;
27  import io.netty.handler.codec.http2.Http2Exception.CompositeStreamException;
28  import io.netty.handler.codec.http2.Http2Exception.StreamException;
29  import io.netty.util.CharsetUtil;
30  import io.netty.util.concurrent.Future;
31  import io.netty.util.internal.logging.InternalLogger;
32  import io.netty.util.internal.logging.InternalLoggerFactory;
33  
34  import java.net.SocketAddress;
35  import java.util.List;
36  import java.util.concurrent.TimeUnit;
37  
38  import static io.netty.buffer.ByteBufUtil.hexDump;
39  import static io.netty.buffer.Unpooled.EMPTY_BUFFER;
40  import static io.netty.handler.codec.http2.Http2CodecUtil.HTTP_UPGRADE_STREAM_ID;
41  import static io.netty.handler.codec.http2.Http2CodecUtil.connectionPrefaceBuf;
42  import static io.netty.handler.codec.http2.Http2CodecUtil.getEmbeddedHttp2Exception;
43  import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
44  import static io.netty.handler.codec.http2.Http2Error.NO_ERROR;
45  import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
46  import static io.netty.handler.codec.http2.Http2Exception.connectionError;
47  import static io.netty.handler.codec.http2.Http2Exception.isStreamError;
48  import static io.netty.handler.codec.http2.Http2FrameTypes.SETTINGS;
49  import static io.netty.handler.codec.http2.Http2Stream.State.IDLE;
50  import static io.netty.util.CharsetUtil.UTF_8;
51  import static io.netty.util.internal.ObjectUtil.checkNotNull;
52  import static java.lang.Math.min;
53  import static java.util.concurrent.TimeUnit.MILLISECONDS;
54  
55  /**
56   * Provides the default implementation for processing inbound frame events and delegates to a
57   * {@link Http2FrameListener}
58   * <p>
59   * This class will read HTTP/2 frames and delegate the events to a {@link Http2FrameListener}
60   * <p>
61   * This interface enforces inbound flow control functionality through
62   * {@link Http2LocalFlowController}
63   */
64  public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http2LifecycleManager,
65                                                                              ChannelOutboundHandler {
66  
67      private static final InternalLogger logger = InternalLoggerFactory.getInstance(Http2ConnectionHandler.class);
68  
69      private static final Http2Headers HEADERS_TOO_LARGE_HEADERS = ReadOnlyHttp2Headers.serverHeaders(false,
70              HttpResponseStatus.REQUEST_HEADER_FIELDS_TOO_LARGE.codeAsText());
71      private static final ByteBuf HTTP_1_X_BUF = Unpooled.unreleasableBuffer(
72          Unpooled.wrappedBuffer(new byte[] {'H', 'T', 'T', 'P', '/', '1', '.'})).asReadOnly();
73  
74      private final Http2ConnectionDecoder decoder;
75      private final Http2ConnectionEncoder encoder;
76      private final Http2Settings initialSettings;
77      private final boolean decoupleCloseAndGoAway;
78      private final boolean flushPreface;
79      private ChannelFutureListener closeListener;
80      private BaseDecoder byteDecoder;
81      private long gracefulShutdownTimeoutMillis;
82  
83      protected Http2ConnectionHandler(Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder,
84                                       Http2Settings initialSettings) {
85          this(decoder, encoder, initialSettings, false);
86      }
87  
88      protected Http2ConnectionHandler(Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder,
89                                       Http2Settings initialSettings, boolean decoupleCloseAndGoAway) {
90          this(decoder, encoder, initialSettings, decoupleCloseAndGoAway, true);
91      }
92  
93      protected Http2ConnectionHandler(Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder,
94                                       Http2Settings initialSettings, boolean decoupleCloseAndGoAway,
95                                       boolean flushPreface) {
96          this.initialSettings = checkNotNull(initialSettings, "initialSettings");
97          this.decoder = checkNotNull(decoder, "decoder");
98          this.encoder = checkNotNull(encoder, "encoder");
99          this.decoupleCloseAndGoAway = decoupleCloseAndGoAway;
100         this.flushPreface = flushPreface;
101         if (encoder.connection() != decoder.connection()) {
102             throw new IllegalArgumentException("Encoder and Decoder do not share the same connection object");
103         }
104     }
105 
106     /**
107      * Get the amount of time (in milliseconds) this endpoint will wait for all streams to be closed before closing
108      * the connection during the graceful shutdown process. Returns -1 if this connection is configured to wait
109      * indefinitely for all streams to close.
110      */
111     public long gracefulShutdownTimeoutMillis() {
112         return gracefulShutdownTimeoutMillis;
113     }
114 
115     /**
116      * Set the amount of time (in milliseconds) this endpoint will wait for all streams to be closed before closing
117      * the connection during the graceful shutdown process.
118      * @param gracefulShutdownTimeoutMillis the amount of time (in milliseconds) this endpoint will wait for all
119      * streams to be closed before closing the connection during the graceful shutdown process.
120      */
121     public void gracefulShutdownTimeoutMillis(long gracefulShutdownTimeoutMillis) {
122         if (gracefulShutdownTimeoutMillis < -1) {
123             throw new IllegalArgumentException("gracefulShutdownTimeoutMillis: " + gracefulShutdownTimeoutMillis +
124                                                " (expected: -1 for indefinite or >= 0)");
125         }
126         this.gracefulShutdownTimeoutMillis = gracefulShutdownTimeoutMillis;
127     }
128 
129     public Http2Connection connection() {
130         return encoder.connection();
131     }
132 
133     public Http2ConnectionDecoder decoder() {
134         return decoder;
135     }
136 
137     public Http2ConnectionEncoder encoder() {
138         return encoder;
139     }
140 
141     private boolean prefaceSent() {
142         return byteDecoder != null && byteDecoder.prefaceSent();
143     }
144 
145     /**
146      * Handles the client-side (cleartext) upgrade from HTTP to HTTP/2.
147      * Reserves local stream 1 for the HTTP/2 response.
148      */
149     public void onHttpClientUpgrade() throws Http2Exception {
150         if (connection().isServer()) {
151             throw connectionError(PROTOCOL_ERROR, "Client-side HTTP upgrade requested for a server");
152         }
153         if (!prefaceSent()) {
154             // If the preface was not sent yet it most likely means the handler was not added to the pipeline before
155             // calling this method.
156             throw connectionError(INTERNAL_ERROR, "HTTP upgrade must occur after preface was sent");
157         }
158         if (decoder.prefaceReceived()) {
159             throw connectionError(PROTOCOL_ERROR, "HTTP upgrade must occur before HTTP/2 preface is received");
160         }
161 
162         // Create a local stream used for the HTTP cleartext upgrade.
163         connection().local().createStream(HTTP_UPGRADE_STREAM_ID, true);
164     }
165 
166     /**
167      * Handles the server-side (cleartext) upgrade from HTTP to HTTP/2.
168      * @param settings the settings for the remote endpoint.
169      */
170     public void onHttpServerUpgrade(Http2Settings settings) throws Http2Exception {
171         if (!connection().isServer()) {
172             throw connectionError(PROTOCOL_ERROR, "Server-side HTTP upgrade requested for a client");
173         }
174         if (!prefaceSent()) {
175             // If the preface was not sent yet it most likely means the handler was not added to the pipeline before
176             // calling this method.
177             throw connectionError(INTERNAL_ERROR, "HTTP upgrade must occur after preface was sent");
178         }
179         if (decoder.prefaceReceived()) {
180             throw connectionError(PROTOCOL_ERROR, "HTTP upgrade must occur before HTTP/2 preface is received");
181         }
182 
183         // Apply the settings but no ACK is necessary.
184         encoder.remoteSettings(settings);
185 
186         // Create a stream in the half-closed state.
187         connection().remote().createStream(HTTP_UPGRADE_STREAM_ID, true);
188     }
189 
190     @Override
191     public void flush(ChannelHandlerContext ctx) {
192         try {
193             // Trigger pending writes in the remote flow controller.
194             encoder.flowController().writePendingBytes();
195             ctx.flush();
196         } catch (Http2Exception e) {
197             onError(ctx, true, e);
198         } catch (Throwable cause) {
199             onError(ctx, true, connectionError(INTERNAL_ERROR, cause, "Error flushing"));
200         }
201     }
202 
203     private abstract class BaseDecoder {
204         public abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;
205         public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { }
206         public void channelActive(ChannelHandlerContext ctx) throws Exception { }
207 
208         public void channelInactive(ChannelHandlerContext ctx) throws Exception {
209             // Connection has terminated, close the encoder and decoder.
210             encoder().close();
211             decoder().close();
212 
213             // We need to remove all streams (not just the active ones).
214             // See https://github.com/netty/netty/issues/4838.
215             connection().close(ctx.voidPromise());
216         }
217 
218         /**
219          * Determine if the HTTP/2 connection preface been sent.
220          */
221         public boolean prefaceSent() {
222             return true;
223         }
224     }
225 
226     private final class PrefaceDecoder extends BaseDecoder {
227         private ByteBuf clientPrefaceString;
228         private boolean prefaceSent;
229 
230         PrefaceDecoder(ChannelHandlerContext ctx) throws Exception {
231             clientPrefaceString = clientPrefaceString(encoder.connection());
232             // This handler was just added to the context. In case it was handled after
233             // the connection became active, send the connection preface now.
234             sendPreface(ctx);
235         }
236 
237         @Override
238         public boolean prefaceSent() {
239             return prefaceSent;
240         }
241 
242         @Override
243         public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
244             try {
245                 if (ctx.channel().isActive() && readClientPrefaceString(in) && verifyFirstFrameIsSettings(in)) {
246                     // After the preface is read, it is time to hand over control to the post initialized decoder.
247                     byteDecoder = new FrameDecoder();
248                     byteDecoder.decode(ctx, in, out);
249                 }
250             } catch (Throwable e) {
251                 onError(ctx, false, e);
252             }
253         }
254 
255         @Override
256         public void channelActive(ChannelHandlerContext ctx) throws Exception {
257             // The channel just became active - send the connection preface to the remote endpoint.
258             sendPreface(ctx);
259 
260             if (flushPreface) {
261                 // As we don't know if any channelReadComplete() events will be triggered at all we need to ensure we
262                 // also flush. Otherwise the remote peer might never see the preface / settings frame.
263                 // See https://github.com/netty/netty/issues/12089
264                 ctx.flush();
265             }
266         }
267 
268         @Override
269         public void channelInactive(ChannelHandlerContext ctx) throws Exception {
270             cleanup();
271             super.channelInactive(ctx);
272         }
273 
274         /**
275          * Releases the {@code clientPrefaceString}. Any active streams will be left in the open.
276          */
277         @Override
278         public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
279             cleanup();
280         }
281 
282         /**
283          * Releases the {@code clientPrefaceString}. Any active streams will be left in the open.
284          */
285         private void cleanup() {
286             if (clientPrefaceString != null) {
287                 clientPrefaceString.release();
288                 clientPrefaceString = null;
289             }
290         }
291 
292         /**
293          * Decodes the client connection preface string from the input buffer.
294          *
295          * @return {@code true} if processing of the client preface string is complete. Since client preface strings can
296          *         only be received by servers, returns true immediately for client endpoints.
297          */
298         private boolean readClientPrefaceString(ByteBuf in) throws Http2Exception {
299             if (clientPrefaceString == null) {
300                 return true;
301             }
302 
303             int prefaceRemaining = clientPrefaceString.readableBytes();
304             int bytesRead = min(in.readableBytes(), prefaceRemaining);
305 
306             // If the input so far doesn't match the preface, break the connection.
307             if (bytesRead == 0 || !ByteBufUtil.equals(in, in.readerIndex(),
308                                                       clientPrefaceString, clientPrefaceString.readerIndex(),
309                                                       bytesRead)) {
310                 int maxSearch = 1024; // picked because 512 is too little, and 2048 too much
311                 int http1Index =
312                     ByteBufUtil.indexOf(HTTP_1_X_BUF, in.slice(in.readerIndex(), min(in.readableBytes(), maxSearch)));
313                 if (http1Index != -1) {
314                     String chunk = in.toString(in.readerIndex(), http1Index - in.readerIndex(), CharsetUtil.US_ASCII);
315                     throw connectionError(PROTOCOL_ERROR, "Unexpected HTTP/1.x request: %s", chunk);
316                 }
317                 String receivedBytes = hexDump(in, in.readerIndex(),
318                                                min(in.readableBytes(), clientPrefaceString.readableBytes()));
319                 throw connectionError(PROTOCOL_ERROR, "HTTP/2 client preface string missing or corrupt. " +
320                                                       "Hex dump for received bytes: %s", receivedBytes);
321             }
322             in.skipBytes(bytesRead);
323             clientPrefaceString.skipBytes(bytesRead);
324 
325             if (!clientPrefaceString.isReadable()) {
326                 // Entire preface has been read.
327                 clientPrefaceString.release();
328                 clientPrefaceString = null;
329                 return true;
330             }
331             return false;
332         }
333 
334         /**
335          * Peeks at that the next frame in the buffer and verifies that it is a non-ack {@code SETTINGS} frame.
336          *
337          * @param in the inbound buffer.
338          * @return {@code true} if the next frame is a non-ack {@code SETTINGS} frame, {@code false} if more
339          * data is required before we can determine the next frame type.
340          * @throws Http2Exception thrown if the next frame is NOT a non-ack {@code SETTINGS} frame.
341          */
342         private boolean verifyFirstFrameIsSettings(ByteBuf in) throws Http2Exception {
343             if (in.readableBytes() < 5) {
344                 // Need more data before we can see the frame type for the first frame.
345                 return false;
346             }
347 
348             short frameType = in.getUnsignedByte(in.readerIndex() + 3);
349             short flags = in.getUnsignedByte(in.readerIndex() + 4);
350             if (frameType != SETTINGS || (flags & Http2Flags.ACK) != 0) {
351                 throw connectionError(PROTOCOL_ERROR, "First received frame was not SETTINGS. " +
352                                                       "Hex dump for first 5 bytes: %s",
353                                       hexDump(in, in.readerIndex(), 5));
354             }
355             return true;
356         }
357 
358         /**
359          * Sends the HTTP/2 connection preface upon establishment of the connection, if not already sent.
360          */
361         private void sendPreface(ChannelHandlerContext ctx) throws Exception {
362             if (prefaceSent || !ctx.channel().isActive()) {
363                 return;
364             }
365 
366             prefaceSent = true;
367 
368             final boolean isClient = !connection().isServer();
369             if (isClient) {
370                 // Clients must send the preface string as the first bytes on the connection.
371                 ctx.write(connectionPrefaceBuf()).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
372             }
373 
374             // Both client and server must send their initial settings.
375             encoder.writeSettings(ctx, initialSettings, ctx.newPromise()).addListener(
376                     ChannelFutureListener.CLOSE_ON_FAILURE);
377 
378             if (isClient) {
379                 // If this handler is extended by the user and we directly fire the userEvent from this context then
380                 // the user will not see the event. We should fire the event starting with this handler so this class
381                 // (and extending classes) have a chance to process the event.
382                 userEventTriggered(ctx, Http2ConnectionPrefaceAndSettingsFrameWrittenEvent.INSTANCE);
383             }
384         }
385     }
386 
387     private final class FrameDecoder extends BaseDecoder {
388         @Override
389         public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
390             try {
391                 decoder.decodeFrame(ctx, in, out);
392             } catch (Throwable e) {
393                 onError(ctx, false, e);
394             }
395         }
396     }
397 
398     @Override
399     public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
400         // Initialize the encoder, decoder, flow controllers, and internal state.
401         encoder.lifecycleManager(this);
402         decoder.lifecycleManager(this);
403         encoder.flowController().channelHandlerContext(ctx);
404         decoder.flowController().channelHandlerContext(ctx);
405         byteDecoder = new PrefaceDecoder(ctx);
406     }
407 
408     @Override
409     protected void handlerRemoved0(ChannelHandlerContext ctx) throws Exception {
410         if (byteDecoder != null) {
411             byteDecoder.handlerRemoved(ctx);
412             byteDecoder = null;
413         }
414     }
415 
416     @Override
417     public void channelActive(ChannelHandlerContext ctx) throws Exception {
418         if (byteDecoder == null) {
419             byteDecoder = new PrefaceDecoder(ctx);
420         }
421         byteDecoder.channelActive(ctx);
422         super.channelActive(ctx);
423     }
424 
425     @Override
426     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
427         // Call super class first, as this may result in decode being called.
428         super.channelInactive(ctx);
429         if (byteDecoder != null) {
430             byteDecoder.channelInactive(ctx);
431             byteDecoder = null;
432         }
433     }
434 
435     @Override
436     public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
437         // Writability is expected to change while we are writing. We cannot allow this event to trigger reentering
438         // the allocation and write loop. Reentering the event loop will lead to over or illegal allocation.
439         try {
440             if (ctx.channel().isWritable()) {
441                 flush(ctx);
442             }
443             encoder.flowController().channelWritabilityChanged();
444         } finally {
445             super.channelWritabilityChanged(ctx);
446         }
447     }
448 
449     @Override
450     protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
451         byteDecoder.decode(ctx, in, out);
452     }
453 
454     @Override
455     public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
456         ctx.bind(localAddress, promise);
457     }
458 
459     @Override
460     public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress,
461                         ChannelPromise promise) throws Exception {
462         ctx.connect(remoteAddress, localAddress, promise);
463     }
464 
465     @Override
466     public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
467         ctx.disconnect(promise);
468     }
469 
470     @Override
471     public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
472         if (decoupleCloseAndGoAway) {
473             ctx.close(promise);
474             return;
475         }
476         promise = promise.unvoid();
477         // Avoid NotYetConnectedException and avoid sending before connection preface
478         if (!ctx.channel().isActive() || !prefaceSent()) {
479             ctx.close(promise);
480             return;
481         }
482 
483         // If the user has already sent a GO_AWAY frame they may be attempting to do a graceful shutdown which requires
484         // sending multiple GO_AWAY frames. We should only send a GO_AWAY here if one has not already been sent. If
485         // a GO_AWAY has been sent we send a empty buffer just so we can wait to close until all other data has been
486         // flushed to the OS.
487         // https://github.com/netty/netty/issues/5307
488         ChannelFuture f = connection().goAwaySent() ? ctx.write(EMPTY_BUFFER) : goAway(ctx, null, ctx.newPromise());
489         ctx.flush();
490         doGracefulShutdown(ctx, f, promise);
491     }
492 
493     private ChannelFutureListener newClosingChannelFutureListener(
494             ChannelHandlerContext ctx, ChannelPromise promise) {
495         long gracefulShutdownTimeoutMillis = this.gracefulShutdownTimeoutMillis;
496         return gracefulShutdownTimeoutMillis < 0 ?
497                 new ClosingChannelFutureListener(ctx, promise) :
498                 new ClosingChannelFutureListener(ctx, promise, gracefulShutdownTimeoutMillis, MILLISECONDS);
499     }
500 
501     private void doGracefulShutdown(ChannelHandlerContext ctx, ChannelFuture future, final ChannelPromise promise) {
502         final ChannelFutureListener listener = newClosingChannelFutureListener(ctx, promise);
503         if (isGracefulShutdownComplete()) {
504             // If there are no active streams, close immediately after the GO_AWAY write completes or the timeout
505             // elapsed.
506             future.addListener(listener);
507         } else {
508             // If there are active streams we should wait until they are all closed before closing the connection.
509 
510             // The ClosingChannelFutureListener will cascade promise completion. We need to always notify the
511             // new ClosingChannelFutureListener when the graceful close completes if the promise is not null.
512             if (closeListener == null) {
513                 closeListener = listener;
514             } else if (promise != null) {
515                 final ChannelFutureListener oldCloseListener = closeListener;
516                 closeListener = new ChannelFutureListener() {
517                     @Override
518                     public void operationComplete(ChannelFuture future) throws Exception {
519                         try {
520                             oldCloseListener.operationComplete(future);
521                         } finally {
522                             listener.operationComplete(future);
523                         }
524                     }
525                 };
526             }
527         }
528     }
529 
530     @Override
531     public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
532         ctx.deregister(promise);
533     }
534 
535     @Override
536     public void read(ChannelHandlerContext ctx) throws Exception {
537         ctx.read();
538     }
539 
540     @Override
541     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
542         ctx.write(msg, promise);
543     }
544 
545     @Override
546     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
547         // Trigger flush after read on the assumption that flush is cheap if there is nothing to write and that
548         // for flow-control the read may release window that causes data to be written that can now be flushed.
549         try {
550             // First call channelReadComplete0(...) as this may produce more data that we want to flush
551             channelReadComplete0(ctx);
552         } finally {
553             flush(ctx);
554         }
555     }
556 
557     final void channelReadComplete0(ChannelHandlerContext ctx) {
558         // Discard bytes of the cumulation buffer if needed.
559         discardSomeReadBytes();
560 
561         // Ensure we never stale the HTTP/2 Channel. Flow-control is enforced by HTTP/2.
562         //
563         // See https://tools.ietf.org/html/rfc7540#section-5.2.2
564         if (!ctx.channel().config().isAutoRead()) {
565             ctx.read();
566         }
567 
568         ctx.fireChannelReadComplete();
569     }
570 
571     /**
572      * Handles {@link Http2Exception} objects that were thrown from other handlers. Ignores all other exceptions.
573      */
574     @Override
575     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
576         if (getEmbeddedHttp2Exception(cause) != null) {
577             // Some exception in the causality chain is an Http2Exception - handle it.
578             onError(ctx, false, cause);
579         } else {
580             super.exceptionCaught(ctx, cause);
581         }
582     }
583 
584     /**
585      * Closes the local side of the given stream. If this causes the stream to be closed, adds a
586      * hook to close the channel after the given future completes.
587      *
588      * @param stream the stream to be half closed.
589      * @param future If closing, the future after which to close the channel.
590      */
591     @Override
592     public void closeStreamLocal(Http2Stream stream, ChannelFuture future) {
593         switch (stream.state()) {
594             case HALF_CLOSED_LOCAL:
595             case OPEN:
596                 stream.closeLocalSide();
597                 break;
598             default:
599                 closeStream(stream, future);
600                 break;
601         }
602     }
603 
604     /**
605      * Closes the remote side of the given stream. If this causes the stream to be closed, adds a
606      * hook to close the channel after the given future completes.
607      *
608      * @param stream the stream to be half closed.
609      * @param future If closing, the future after which to close the channel.
610      */
611     @Override
612     public void closeStreamRemote(Http2Stream stream, ChannelFuture future) {
613         switch (stream.state()) {
614             case HALF_CLOSED_REMOTE:
615             case OPEN:
616                 stream.closeRemoteSide();
617                 break;
618             default:
619                 closeStream(stream, future);
620                 break;
621         }
622     }
623 
624     @Override
625     public void closeStream(final Http2Stream stream, ChannelFuture future) {
626         if (future.isDone()) {
627             doCloseStream(stream, future);
628         } else {
629             future.addListener(new ChannelFutureListener() {
630                 @Override
631                 public void operationComplete(ChannelFuture future) {
632                     doCloseStream(stream, future);
633                 }
634             });
635         }
636     }
637 
638     /**
639      * Central handler for all exceptions caught during HTTP/2 processing.
640      */
641     @Override
642     public void onError(ChannelHandlerContext ctx, boolean outbound, Throwable cause) {
643         Http2Exception embedded = getEmbeddedHttp2Exception(cause);
644         if (isStreamError(embedded)) {
645             onStreamError(ctx, outbound, cause, (StreamException) embedded);
646         } else if (embedded instanceof CompositeStreamException) {
647             CompositeStreamException compositException = (CompositeStreamException) embedded;
648             for (StreamException streamException : compositException) {
649                 onStreamError(ctx, outbound, cause, streamException);
650             }
651         } else {
652             onConnectionError(ctx, outbound, cause, embedded);
653         }
654         ctx.flush();
655     }
656 
657     /**
658      * Called by the graceful shutdown logic to determine when it is safe to close the connection. Returns {@code true}
659      * if the graceful shutdown has completed and the connection can be safely closed. This implementation just
660      * guarantees that there are no active streams. Subclasses may override to provide additional checks.
661      */
662     protected boolean isGracefulShutdownComplete() {
663         return connection().numActiveStreams() == 0;
664     }
665 
666     /**
667      * Handler for a connection error. Sends a GO_AWAY frame to the remote endpoint. Once all
668      * streams are closed, the connection is shut down.
669      *
670      * @param ctx the channel context
671      * @param outbound {@code true} if the error was caused by an outbound operation.
672      * @param cause the exception that was caught
673      * @param http2Ex the {@link Http2Exception} that is embedded in the causality chain. This may
674      *            be {@code null} if it's an unknown exception.
675      */
676     protected void onConnectionError(ChannelHandlerContext ctx, boolean outbound,
677                                      Throwable cause, Http2Exception http2Ex) {
678         if (http2Ex == null) {
679             http2Ex = new Http2Exception(INTERNAL_ERROR, cause.getMessage(), cause);
680         }
681 
682         ChannelPromise promise = ctx.newPromise();
683         ChannelFuture future = goAway(ctx, http2Ex, ctx.newPromise());
684         if (http2Ex.shutdownHint() == Http2Exception.ShutdownHint.GRACEFUL_SHUTDOWN) {
685             doGracefulShutdown(ctx, future, promise);
686         } else {
687             future.addListener(newClosingChannelFutureListener(ctx, promise));
688         }
689     }
690 
691     /**
692      * Handler for a stream error. Sends a {@code RST_STREAM} frame to the remote endpoint and closes the
693      * stream.
694      *
695      * @param ctx the channel context
696      * @param outbound {@code true} if the error was caused by an outbound operation.
697      * @param cause the exception that was caught
698      * @param http2Ex the {@link StreamException} that is embedded in the causality chain.
699      */
700     protected void onStreamError(ChannelHandlerContext ctx, boolean outbound,
701                                  @SuppressWarnings("unused") Throwable cause, StreamException http2Ex) {
702         final int streamId = http2Ex.streamId();
703         Http2Stream stream = connection().stream(streamId);
704 
705         //if this is caused by reading headers that are too large, send a header with status 431
706         if (http2Ex instanceof Http2Exception.HeaderListSizeException &&
707             ((Http2Exception.HeaderListSizeException) http2Ex).duringDecode() &&
708             connection().isServer()) {
709 
710             // NOTE We have to check to make sure that a stream exists before we send our reply.
711             // We likely always create the stream below as the stream isn't created until the
712             // header block is completely processed.
713 
714             // The case of a streamId referring to a stream which was already closed is handled
715             // by createStream and will land us in the catch block below
716             if (stream == null) {
717                 try {
718                     stream = encoder.connection().remote().createStream(streamId, true);
719                 } catch (Http2Exception e) {
720                     resetUnknownStream(ctx, streamId, http2Ex.error().code(), ctx.newPromise());
721                     return;
722                 }
723             }
724 
725             // ensure that we have not already sent headers on this stream
726             if (stream != null && !stream.isHeadersSent()) {
727                 try {
728                     handleServerHeaderDecodeSizeError(ctx, stream);
729                 } catch (Throwable cause2) {
730                     onError(ctx, outbound, connectionError(INTERNAL_ERROR, cause2, "Error DecodeSizeError"));
731                 }
732             }
733         }
734 
735         if (stream == null) {
736             if (!outbound || connection().local().mayHaveCreatedStream(streamId)) {
737                 resetUnknownStream(ctx, streamId, http2Ex.error().code(), ctx.newPromise());
738             }
739         } else {
740             resetStream(ctx, stream, http2Ex.error().code(), ctx.newPromise());
741         }
742     }
743 
744     /**
745      * Notifies client that this server has received headers that are larger than what it is
746      * willing to accept. Override to change behavior.
747      *
748      * @param ctx the channel context
749      * @param stream the Http2Stream on which the header was received
750      */
751     protected void handleServerHeaderDecodeSizeError(ChannelHandlerContext ctx, Http2Stream stream) {
752         encoder().writeHeaders(ctx, stream.id(), HEADERS_TOO_LARGE_HEADERS, 0, true, ctx.newPromise());
753     }
754 
755     protected Http2FrameWriter frameWriter() {
756         return encoder().frameWriter();
757     }
758 
759     /**
760      * Sends a {@code RST_STREAM} frame even if we don't know about the stream. This error condition is most likely
761      * triggered by the first frame of a stream being invalid. That is, there was an error reading the frame before
762      * we could create a new stream.
763      */
764     private ChannelFuture resetUnknownStream(final ChannelHandlerContext ctx, int streamId, long errorCode,
765                                              ChannelPromise promise) {
766         ChannelFuture future = frameWriter().writeRstStream(ctx, streamId, errorCode, promise);
767         if (future.isDone()) {
768             closeConnectionOnError(ctx, future);
769         } else {
770             future.addListener(new ChannelFutureListener() {
771                 @Override
772                 public void operationComplete(ChannelFuture future) throws Exception {
773                     closeConnectionOnError(ctx, future);
774                 }
775             });
776         }
777         return future;
778     }
779 
780     @Override
781     public ChannelFuture resetStream(final ChannelHandlerContext ctx, int streamId, long errorCode,
782                                      ChannelPromise promise) {
783         final Http2Stream stream = connection().stream(streamId);
784         if (stream == null) {
785             return resetUnknownStream(ctx, streamId, errorCode, promise.unvoid());
786         }
787 
788        return resetStream(ctx, stream, errorCode, promise);
789     }
790 
791     private ChannelFuture resetStream(final ChannelHandlerContext ctx, final Http2Stream stream,
792                                       long errorCode, ChannelPromise promise) {
793         promise = promise.unvoid();
794         if (stream.isResetSent()) {
795             // Don't write a RST_STREAM frame if we have already written one.
796             return promise.setSuccess();
797         }
798         // Synchronously set the resetSent flag to prevent any subsequent calls
799         // from resulting in multiple reset frames being sent.
800         //
801         // This needs to be done before we notify the promise as the promise may have a listener attached that
802         // call resetStream(...) again.
803         stream.resetSent();
804 
805         final ChannelFuture future;
806         // If the remote peer is not aware of the steam, then we are not allowed to send a RST_STREAM
807         // https://tools.ietf.org/html/rfc7540#section-6.4.
808         if (stream.state() == IDLE ||
809             connection().local().created(stream) && !stream.isHeadersSent() && !stream.isPushPromiseSent()) {
810             future = promise.setSuccess();
811         } else {
812             future = frameWriter().writeRstStream(ctx, stream.id(), errorCode, promise);
813         }
814         if (future.isDone()) {
815             processRstStreamWriteResult(ctx, stream, future);
816         } else {
817             future.addListener(new ChannelFutureListener() {
818                 @Override
819                 public void operationComplete(ChannelFuture future) throws Exception {
820                     processRstStreamWriteResult(ctx, stream, future);
821                 }
822             });
823         }
824 
825         return future;
826     }
827 
828     @Override
829     public ChannelFuture goAway(final ChannelHandlerContext ctx, final int lastStreamId, final long errorCode,
830                                 final ByteBuf debugData, ChannelPromise promise) {
831         promise = promise.unvoid();
832         final Http2Connection connection = connection();
833         try {
834             if (!connection.goAwaySent(lastStreamId, errorCode, debugData)) {
835                 debugData.release();
836                 promise.trySuccess();
837                 return promise;
838             }
839         } catch (Throwable cause) {
840             debugData.release();
841             promise.tryFailure(cause);
842             return promise;
843         }
844 
845         // Need to retain before we write the buffer because if we do it after the refCnt could already be 0 and
846         // result in an IllegalRefCountException.
847         debugData.retain();
848         ChannelFuture future = frameWriter().writeGoAway(ctx, lastStreamId, errorCode, debugData, promise);
849 
850         if (future.isDone()) {
851             processGoAwayWriteResult(ctx, lastStreamId, errorCode, debugData, future);
852         } else {
853             future.addListener(new ChannelFutureListener() {
854                 @Override
855                 public void operationComplete(ChannelFuture future) throws Exception {
856                     processGoAwayWriteResult(ctx, lastStreamId, errorCode, debugData, future);
857                 }
858             });
859         }
860 
861         return future;
862     }
863 
864     /**
865      * Closes the connection if the graceful shutdown process has completed.
866      * @param future Represents the status that will be passed to the {@link #closeListener}.
867      */
868     private void checkCloseConnection(ChannelFuture future) {
869         // If this connection is closing and the graceful shutdown has completed, close the connection
870         // once this operation completes.
871         if (closeListener != null && isGracefulShutdownComplete()) {
872             ChannelFutureListener closeListener = this.closeListener;
873             // This method could be called multiple times
874             // and we don't want to notify the closeListener multiple times.
875             this.closeListener = null;
876             try {
877                 closeListener.operationComplete(future);
878             } catch (Exception e) {
879                 throw new IllegalStateException("Close listener threw an unexpected exception", e);
880             }
881         }
882     }
883 
884     /**
885      * Close the remote endpoint with a {@code GO_AWAY} frame. Does <strong>not</strong> flush
886      * immediately, this is the responsibility of the caller.
887      */
888     private ChannelFuture goAway(ChannelHandlerContext ctx, Http2Exception cause, ChannelPromise promise) {
889         long errorCode = cause != null ? cause.error().code() : NO_ERROR.code();
890         int lastKnownStream;
891         if (cause != null && cause.shutdownHint() == Http2Exception.ShutdownHint.HARD_SHUTDOWN) {
892             // The hard shutdown could have been triggered during header processing, before updating
893             // lastStreamCreated(). Specifically, any connection errors encountered by Http2FrameReader or HPACK
894             // decoding will fail to update the last known stream. So we must be pessimistic.
895             // https://github.com/netty/netty/issues/10670
896             lastKnownStream = Integer.MAX_VALUE;
897         } else {
898             lastKnownStream = connection().remote().lastStreamCreated();
899         }
900         return goAway(ctx, lastKnownStream, errorCode, Http2CodecUtil.toByteBuf(ctx, cause), promise);
901     }
902 
903     private void processRstStreamWriteResult(ChannelHandlerContext ctx, Http2Stream stream, ChannelFuture future) {
904         if (future.isSuccess()) {
905             closeStream(stream, future);
906         } else {
907             // The connection will be closed and so no need to change the resetSent flag to false.
908             onConnectionError(ctx, true, future.cause(), null);
909         }
910     }
911 
912     private void closeConnectionOnError(ChannelHandlerContext ctx, ChannelFuture future) {
913         if (!future.isSuccess()) {
914             onConnectionError(ctx, true, future.cause(), null);
915         }
916     }
917 
918     private void doCloseStream(final Http2Stream stream, ChannelFuture future) {
919         stream.close();
920         checkCloseConnection(future);
921     }
922 
923     /**
924      * Returns the client preface string if this is a client connection, otherwise returns {@code null}.
925      */
926     private static ByteBuf clientPrefaceString(Http2Connection connection) {
927         return connection.isServer() ? connectionPrefaceBuf() : null;
928     }
929 
930     private static void processGoAwayWriteResult(final ChannelHandlerContext ctx, final int lastStreamId,
931                                                  final long errorCode, final ByteBuf debugData, ChannelFuture future) {
932         try {
933             if (future.isSuccess()) {
934                 if (errorCode != NO_ERROR.code()) {
935                     if (logger.isDebugEnabled()) {
936                         logger.debug("{} Sent GOAWAY: lastStreamId '{}', errorCode '{}', " +
937                                      "debugData '{}'. Forcing shutdown of the connection.",
938                                      ctx.channel(), lastStreamId, errorCode, debugData.toString(UTF_8), future.cause());
939                     }
940                     ctx.close();
941                 }
942             } else {
943                 if (logger.isDebugEnabled()) {
944                     logger.debug("{} Sending GOAWAY failed: lastStreamId '{}', errorCode '{}', " +
945                                  "debugData '{}'. Forcing shutdown of the connection.",
946                                  ctx.channel(), lastStreamId, errorCode, debugData.toString(UTF_8), future.cause());
947                 }
948                 ctx.close();
949             }
950         } finally {
951             // We're done with the debug data now.
952             debugData.release();
953         }
954     }
955 
956     /**
957      * Closes the channel when the future completes.
958      */
959     private static final class ClosingChannelFutureListener implements ChannelFutureListener {
960         private final ChannelHandlerContext ctx;
961         private final ChannelPromise promise;
962         private final Future<?> timeoutTask;
963         private boolean closed;
964 
965         ClosingChannelFutureListener(ChannelHandlerContext ctx, ChannelPromise promise) {
966             this.ctx = ctx;
967             this.promise = promise;
968             timeoutTask = null;
969         }
970 
971         ClosingChannelFutureListener(final ChannelHandlerContext ctx, final ChannelPromise promise,
972                                      long timeout, TimeUnit unit) {
973             this.ctx = ctx;
974             this.promise = promise;
975             timeoutTask = ctx.executor().schedule(new Runnable() {
976                 @Override
977                 public void run() {
978                     doClose();
979                 }
980             }, timeout, unit);
981         }
982 
983         @Override
984         public void operationComplete(ChannelFuture sentGoAwayFuture) {
985             if (timeoutTask != null) {
986                 timeoutTask.cancel(false);
987             }
988             doClose();
989         }
990 
991         private void doClose() {
992             // We need to guard against multiple calls as the timeout may trigger close() first and then it will be
993             // triggered again because of operationComplete(...) is called.
994             if (closed) {
995                 // This only happens if we also scheduled a timeout task.
996                 assert timeoutTask != null;
997                 return;
998             }
999             closed = true;
1000             if (promise == null) {
1001                 ctx.close();
1002             } else {
1003                 ctx.close(promise);
1004             }
1005         }
1006     }
1007 }