View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License, version 2.0 (the
5    * "License"); you may not use this file except in compliance with the License. You may obtain a
6    * copy of the License at:
7    *
8    * https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software distributed under the License
11   * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12   * or implied. See the License for the specific language governing permissions and limitations under
13   * the License.
14   */
15  package io.netty.handler.codec.http2;
16  
17  import io.netty.buffer.ByteBuf;
18  import io.netty.buffer.ByteBufUtil;
19  import io.netty.buffer.Unpooled;
20  import io.netty.channel.ChannelFuture;
21  import io.netty.channel.ChannelFutureListener;
22  import io.netty.channel.ChannelHandlerContext;
23  import io.netty.channel.ChannelOutboundHandler;
24  import io.netty.channel.ChannelPromise;
25  import io.netty.handler.codec.ByteToMessageDecoder;
26  import io.netty.handler.codec.http.HttpResponseStatus;
27  import io.netty.handler.codec.http2.Http2Exception.CompositeStreamException;
28  import io.netty.handler.codec.http2.Http2Exception.StreamException;
29  import io.netty.util.CharsetUtil;
30  import io.netty.util.concurrent.Future;
31  import io.netty.util.internal.logging.InternalLogger;
32  import io.netty.util.internal.logging.InternalLoggerFactory;
33  
34  import java.net.SocketAddress;
35  import java.util.List;
36  import java.util.concurrent.TimeUnit;
37  
38  import static io.netty.buffer.ByteBufUtil.hexDump;
39  import static io.netty.buffer.Unpooled.EMPTY_BUFFER;
40  import static io.netty.handler.codec.http2.Http2CodecUtil.HTTP_UPGRADE_STREAM_ID;
41  import static io.netty.handler.codec.http2.Http2CodecUtil.connectionPrefaceBuf;
42  import static io.netty.handler.codec.http2.Http2CodecUtil.getEmbeddedHttp2Exception;
43  import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
44  import static io.netty.handler.codec.http2.Http2Error.NO_ERROR;
45  import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
46  import static io.netty.handler.codec.http2.Http2Exception.connectionError;
47  import static io.netty.handler.codec.http2.Http2Exception.isStreamError;
48  import static io.netty.handler.codec.http2.Http2FrameTypes.SETTINGS;
49  import static io.netty.handler.codec.http2.Http2Stream.State.IDLE;
50  import static io.netty.util.CharsetUtil.UTF_8;
51  import static io.netty.util.internal.ObjectUtil.checkNotNull;
52  import static java.lang.Math.min;
53  import static java.util.concurrent.TimeUnit.MILLISECONDS;
54  
55  /**
56   * Provides the default implementation for processing inbound frame events and delegates to a
57   * {@link Http2FrameListener}
58   * <p>
59   * This class will read HTTP/2 frames and delegate the events to a {@link Http2FrameListener}
60   * <p>
61   * This interface enforces inbound flow control functionality through
62   * {@link Http2LocalFlowController}
63   */
64  public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http2LifecycleManager,
65                                                                              ChannelOutboundHandler {
66  
67      private static final InternalLogger logger = InternalLoggerFactory.getInstance(Http2ConnectionHandler.class);
68  
69      private static final Http2Headers HEADERS_TOO_LARGE_HEADERS = ReadOnlyHttp2Headers.serverHeaders(false,
70              HttpResponseStatus.REQUEST_HEADER_FIELDS_TOO_LARGE.codeAsText());
71      private static final ByteBuf HTTP_1_X_BUF = Unpooled.unreleasableBuffer(
72          Unpooled.wrappedBuffer(new byte[] {'H', 'T', 'T', 'P', '/', '1', '.'})).asReadOnly();
73  
74      private final Http2ConnectionDecoder decoder;
75      private final Http2ConnectionEncoder encoder;
76      private final Http2Settings initialSettings;
77      private final boolean decoupleCloseAndGoAway;
78      private final boolean flushPreface;
79      private ChannelFutureListener closeListener;
80      private BaseDecoder byteDecoder;
81      private long gracefulShutdownTimeoutMillis;
82  
83      protected Http2ConnectionHandler(Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder,
84                                       Http2Settings initialSettings) {
85          this(decoder, encoder, initialSettings, false);
86      }
87  
88      protected Http2ConnectionHandler(Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder,
89                                       Http2Settings initialSettings, boolean decoupleCloseAndGoAway) {
90          this(decoder, encoder, initialSettings, decoupleCloseAndGoAway, true);
91      }
92  
93      protected Http2ConnectionHandler(Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder,
94                                       Http2Settings initialSettings, boolean decoupleCloseAndGoAway,
95                                       boolean flushPreface) {
96          this.initialSettings = checkNotNull(initialSettings, "initialSettings");
97          this.decoder = checkNotNull(decoder, "decoder");
98          this.encoder = checkNotNull(encoder, "encoder");
99          this.decoupleCloseAndGoAway = decoupleCloseAndGoAway;
100         this.flushPreface = flushPreface;
101         if (encoder.connection() != decoder.connection()) {
102             throw new IllegalArgumentException("Encoder and Decoder do not share the same connection object");
103         }
104     }
105 
106     /**
107      * Get the amount of time (in milliseconds) this endpoint will wait for all streams to be closed before closing
108      * the connection during the graceful shutdown process. Returns -1 if this connection is configured to wait
109      * indefinitely for all streams to close.
110      */
111     public long gracefulShutdownTimeoutMillis() {
112         return gracefulShutdownTimeoutMillis;
113     }
114 
115     /**
116      * Set the amount of time (in milliseconds) this endpoint will wait for all streams to be closed before closing
117      * the connection during the graceful shutdown process.
118      * @param gracefulShutdownTimeoutMillis the amount of time (in milliseconds) this endpoint will wait for all
119      * streams to be closed before closing the connection during the graceful shutdown process.
120      */
121     public void gracefulShutdownTimeoutMillis(long gracefulShutdownTimeoutMillis) {
122         if (gracefulShutdownTimeoutMillis < -1) {
123             throw new IllegalArgumentException("gracefulShutdownTimeoutMillis: " + gracefulShutdownTimeoutMillis +
124                                                " (expected: -1 for indefinite or >= 0)");
125         }
126         this.gracefulShutdownTimeoutMillis = gracefulShutdownTimeoutMillis;
127     }
128 
129     public Http2Connection connection() {
130         return encoder.connection();
131     }
132 
133     public Http2ConnectionDecoder decoder() {
134         return decoder;
135     }
136 
137     public Http2ConnectionEncoder encoder() {
138         return encoder;
139     }
140 
141     private boolean prefaceSent() {
142         return byteDecoder != null && byteDecoder.prefaceSent();
143     }
144 
145     /**
146      * Handles the client-side (cleartext) upgrade from HTTP to HTTP/2.
147      * Reserves local stream 1 for the HTTP/2 response.
148      */
149     public void onHttpClientUpgrade() throws Http2Exception {
150         if (connection().isServer()) {
151             throw connectionError(PROTOCOL_ERROR, "Client-side HTTP upgrade requested for a server");
152         }
153         if (!prefaceSent()) {
154             // If the preface was not sent yet it most likely means the handler was not added to the pipeline before
155             // calling this method.
156             throw connectionError(INTERNAL_ERROR, "HTTP upgrade must occur after preface was sent");
157         }
158         if (decoder.prefaceReceived()) {
159             throw connectionError(PROTOCOL_ERROR, "HTTP upgrade must occur before HTTP/2 preface is received");
160         }
161 
162         // Create a local stream used for the HTTP cleartext upgrade.
163         connection().local().createStream(HTTP_UPGRADE_STREAM_ID, true);
164     }
165 
166     /**
167      * Handles the server-side (cleartext) upgrade from HTTP to HTTP/2.
168      * @param settings the settings for the remote endpoint.
169      */
170     public void onHttpServerUpgrade(Http2Settings settings) throws Http2Exception {
171         if (!connection().isServer()) {
172             throw connectionError(PROTOCOL_ERROR, "Server-side HTTP upgrade requested for a client");
173         }
174         if (!prefaceSent()) {
175             // If the preface was not sent yet it most likely means the handler was not added to the pipeline before
176             // calling this method.
177             throw connectionError(INTERNAL_ERROR, "HTTP upgrade must occur after preface was sent");
178         }
179         if (decoder.prefaceReceived()) {
180             throw connectionError(PROTOCOL_ERROR, "HTTP upgrade must occur before HTTP/2 preface is received");
181         }
182 
183         // Apply the settings but no ACK is necessary.
184         encoder.remoteSettings(settings);
185 
186         // Create a stream in the half-closed state.
187         connection().remote().createStream(HTTP_UPGRADE_STREAM_ID, true);
188     }
189 
190     @Override
191     public void flush(ChannelHandlerContext ctx) {
192         try {
193             // Trigger pending writes in the remote flow controller.
194             encoder.flowController().writePendingBytes();
195             ctx.flush();
196         } catch (Http2Exception e) {
197             onError(ctx, true, e);
198         } catch (Throwable cause) {
199             onError(ctx, true, connectionError(INTERNAL_ERROR, cause, "Error flushing"));
200         }
201     }
202 
203     private abstract class BaseDecoder {
204         public abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;
205         public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { }
206         public void channelActive(ChannelHandlerContext ctx) throws Exception { }
207 
208         public void channelInactive(ChannelHandlerContext ctx) throws Exception {
209             // Connection has terminated, close the encoder and decoder.
210             encoder().close();
211             decoder().close();
212 
213             // We need to remove all streams (not just the active ones).
214             // See https://github.com/netty/netty/issues/4838.
215             connection().close(ctx.voidPromise());
216         }
217 
218         /**
219          * Determine if the HTTP/2 connection preface been sent.
220          */
221         public boolean prefaceSent() {
222             return true;
223         }
224 
225         /**
226          * Send the preface if needed.
227          *
228          * @param ctx           the {@link ChannelHandlerContext} to use.
229          * @throws Exception    thrown on error.
230          */
231         public void sendPrefaceIfNeeded(ChannelHandlerContext ctx) throws Exception {
232             // Noop by default.
233         }
234     }
235 
236     private final class PrefaceDecoder extends BaseDecoder {
237         private ByteBuf clientPrefaceString;
238         private boolean prefaceSent;
239 
240         PrefaceDecoder(ChannelHandlerContext ctx) throws Exception {
241             clientPrefaceString = clientPrefaceString(encoder.connection());
242             // This handler was just added to the context. In case it was handled after
243             // the connection became active, send the connection preface now.
244             sendPrefaceIfNeeded(ctx);
245         }
246 
247         @Override
248         public boolean prefaceSent() {
249             return prefaceSent;
250         }
251 
252         @Override
253         public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
254             try {
255                 if (ctx.channel().isActive() && readClientPrefaceString(in) && verifyFirstFrameIsSettings(in)) {
256                     // After the preface is read, it is time to hand over control to the post initialized decoder.
257                     byteDecoder = new FrameDecoder();
258                     byteDecoder.decode(ctx, in, out);
259                 }
260             } catch (Throwable e) {
261                 if (byteDecoder != null) {
262                     // Skip all bytes before we report the exception as
263                     in.skipBytes(in.readableBytes());
264                 }
265                 onError(ctx, false, e);
266             }
267         }
268 
269         @Override
270         public void channelActive(ChannelHandlerContext ctx) throws Exception {
271             // The channel just became active - send the connection preface to the remote endpoint.
272             sendPrefaceIfNeeded(ctx);
273         }
274 
275         @Override
276         public void channelInactive(ChannelHandlerContext ctx) throws Exception {
277             cleanup();
278             super.channelInactive(ctx);
279         }
280 
281         /**
282          * Releases the {@code clientPrefaceString}. Any active streams will be left in the open.
283          */
284         @Override
285         public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
286             cleanup();
287         }
288 
289         /**
290          * Releases the {@code clientPrefaceString}. Any active streams will be left in the open.
291          */
292         private void cleanup() {
293             if (clientPrefaceString != null) {
294                 clientPrefaceString.release();
295                 clientPrefaceString = null;
296             }
297         }
298 
299         /**
300          * Decodes the client connection preface string from the input buffer.
301          *
302          * @return {@code true} if processing of the client preface string is complete. Since client preface strings can
303          *         only be received by servers, returns true immediately for client endpoints.
304          */
305         private boolean readClientPrefaceString(ByteBuf in) throws Http2Exception {
306             if (clientPrefaceString == null) {
307                 return true;
308             }
309 
310             int prefaceRemaining = clientPrefaceString.readableBytes();
311             int bytesRead = min(in.readableBytes(), prefaceRemaining);
312 
313             // If the input so far doesn't match the preface, break the connection.
314             if (bytesRead == 0 || !ByteBufUtil.equals(in, in.readerIndex(),
315                                                       clientPrefaceString, clientPrefaceString.readerIndex(),
316                                                       bytesRead)) {
317                 int maxSearch = 1024; // picked because 512 is too little, and 2048 too much
318                 int http1Index =
319                     ByteBufUtil.indexOf(HTTP_1_X_BUF, in.slice(in.readerIndex(), min(in.readableBytes(), maxSearch)));
320                 if (http1Index != -1) {
321                     String chunk = in.toString(in.readerIndex(), http1Index - in.readerIndex(), CharsetUtil.US_ASCII);
322                     throw connectionError(PROTOCOL_ERROR, "Unexpected HTTP/1.x request: %s", chunk);
323                 }
324                 String receivedBytes = hexDump(in, in.readerIndex(),
325                                                min(in.readableBytes(), clientPrefaceString.readableBytes()));
326                 throw connectionError(PROTOCOL_ERROR, "HTTP/2 client preface string missing or corrupt. " +
327                                                       "Hex dump for received bytes: %s", receivedBytes);
328             }
329             in.skipBytes(bytesRead);
330             clientPrefaceString.skipBytes(bytesRead);
331 
332             if (!clientPrefaceString.isReadable()) {
333                 // Entire preface has been read.
334                 clientPrefaceString.release();
335                 clientPrefaceString = null;
336                 return true;
337             }
338             return false;
339         }
340 
341         /**
342          * Peeks at that the next frame in the buffer and verifies that it is a non-ack {@code SETTINGS} frame.
343          *
344          * @param in the inbound buffer.
345          * @return {@code true} if the next frame is a non-ack {@code SETTINGS} frame, {@code false} if more
346          * data is required before we can determine the next frame type.
347          * @throws Http2Exception thrown if the next frame is NOT a non-ack {@code SETTINGS} frame.
348          */
349         private boolean verifyFirstFrameIsSettings(ByteBuf in) throws Http2Exception {
350             if (in.readableBytes() < 5) {
351                 // Need more data before we can see the frame type for the first frame.
352                 return false;
353             }
354 
355             short frameType = in.getUnsignedByte(in.readerIndex() + 3);
356             if (frameType != SETTINGS) {
357                 throw connectionError(PROTOCOL_ERROR, "First received frame was not SETTINGS. " +
358                                                       "Hex dump for first 5 bytes: %s",
359                                       hexDump(in, in.readerIndex(), 5));
360             }
361             short flags = in.getUnsignedByte(in.readerIndex() + 4);
362             if ((flags & Http2Flags.ACK) != 0) {
363                 throw connectionError(PROTOCOL_ERROR, "First received frame was SETTINGS frame but had ACK flag set. " +
364                         "Hex dump for first 5 bytes: %s",
365                         hexDump(in, in.readerIndex(), 5));
366             }
367             return true;
368         }
369 
370         /**
371          * Sends the HTTP/2 connection preface upon establishment of the connection, if not already sent.
372          */
373         @Override
374         public void sendPrefaceIfNeeded(ChannelHandlerContext ctx) throws Exception {
375             if (prefaceSent || !ctx.channel().isActive()) {
376                 return;
377             }
378 
379             prefaceSent = true;
380 
381             final boolean isClient = !connection().isServer();
382             if (isClient) {
383                 // Clients must send the preface string as the first bytes on the connection.
384                 ctx.write(connectionPrefaceBuf()).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
385             }
386 
387             // Both client and server must send their initial settings.
388             encoder.writeSettings(ctx, initialSettings, ctx.newPromise()).addListener(
389                     ChannelFutureListener.CLOSE_ON_FAILURE);
390 
391             try {
392                 if (isClient) {
393                     // If this handler is extended by the user and we directly fire the userEvent from this context then
394                     // the user will not see the event. We should fire the event starting with this handler so this
395                     // class (and extending classes) have a chance to process the event.
396                     userEventTriggered(ctx, Http2ConnectionPrefaceAndSettingsFrameWrittenEvent.INSTANCE);
397                 }
398             } finally {
399                 if (flushPreface) {
400                     // As we don't know if any channelReadComplete() events will be triggered at all we need to ensure
401                     // we also flush. Otherwise the remote peer might never see the preface / settings frame.
402                     // See https://github.com/netty/netty/issues/12089
403                     ctx.flush();
404                 }
405             }
406         }
407     }
408 
409     private final class FrameDecoder extends BaseDecoder {
410         @Override
411         public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
412             try {
413                 decoder.decodeFrame(ctx, in, out);
414             } catch (Throwable e) {
415                 onError(ctx, false, e);
416             }
417         }
418     }
419 
420     @Override
421     public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
422         // Initialize the encoder, decoder, flow controllers, and internal state.
423         encoder.lifecycleManager(this);
424         decoder.lifecycleManager(this);
425         encoder.flowController().channelHandlerContext(ctx);
426         decoder.flowController().channelHandlerContext(ctx);
427         byteDecoder = new PrefaceDecoder(ctx);
428     }
429 
430     @Override
431     protected void handlerRemoved0(ChannelHandlerContext ctx) throws Exception {
432         if (byteDecoder != null) {
433             byteDecoder.handlerRemoved(ctx);
434             byteDecoder = null;
435         }
436     }
437 
438     @Override
439     public void channelActive(ChannelHandlerContext ctx) throws Exception {
440         if (byteDecoder == null) {
441             byteDecoder = new PrefaceDecoder(ctx);
442         }
443         byteDecoder.channelActive(ctx);
444         super.channelActive(ctx);
445     }
446 
447     @Override
448     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
449         // Call super class first, as this may result in decode being called.
450         super.channelInactive(ctx);
451         if (byteDecoder != null) {
452             byteDecoder.channelInactive(ctx);
453             byteDecoder = null;
454         }
455     }
456 
457     @Override
458     public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
459         // Writability is expected to change while we are writing. We cannot allow this event to trigger reentering
460         // the allocation and write loop. Reentering the event loop will lead to over or illegal allocation.
461         try {
462             if (ctx.channel().isWritable()) {
463                 flush(ctx);
464             }
465             encoder.flowController().channelWritabilityChanged();
466         } finally {
467             super.channelWritabilityChanged(ctx);
468         }
469     }
470 
471     @Override
472     protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
473         byteDecoder.decode(ctx, in, out);
474     }
475 
476     @Override
477     public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
478         // Ensure we send the preface before we notify the bind promise as the user might try to write
479         // directly in the listener attached to the promise and we need to ensure the preface is always the first
480         // thing that is written.
481         ctx.bind(localAddress, ctx.newPromise()).addListener(new PrefaceSendListener(ctx, promise));
482     }
483 
484     @Override
485     public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress,
486                         ChannelPromise promise) throws Exception {
487         // Ensure we send the preface before we notify the connect promise as the user might try to write
488         // directly in the listener attached to the promise and we need to ensure the preface is always the first
489         // thing that is written.
490         ctx.connect(remoteAddress, localAddress, ctx.newPromise()).addListener(new PrefaceSendListener(ctx, promise));
491     }
492 
493     @Override
494     public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
495         ctx.disconnect(promise);
496     }
497 
498     @Override
499     public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
500         if (decoupleCloseAndGoAway) {
501             ctx.close(promise);
502             return;
503         }
504         promise = promise.unvoid();
505         // Avoid NotYetConnectedException and avoid sending before connection preface
506         if (!ctx.channel().isActive() || !prefaceSent()) {
507             ctx.close(promise);
508             return;
509         }
510 
511         // If the user has already sent a GO_AWAY frame they may be attempting to do a graceful shutdown which requires
512         // sending multiple GO_AWAY frames. We should only send a GO_AWAY here if one has not already been sent. If
513         // a GO_AWAY has been sent we send a empty buffer just so we can wait to close until all other data has been
514         // flushed to the OS.
515         // https://github.com/netty/netty/issues/5307
516         ChannelFuture f = connection().goAwaySent() ? ctx.write(EMPTY_BUFFER) : goAway(ctx, null, ctx.newPromise());
517         ctx.flush();
518         doGracefulShutdown(ctx, f, promise);
519     }
520 
521     private ChannelFutureListener newClosingChannelFutureListener(
522             ChannelHandlerContext ctx, ChannelPromise promise) {
523         long gracefulShutdownTimeoutMillis = this.gracefulShutdownTimeoutMillis;
524         return gracefulShutdownTimeoutMillis < 0 ?
525                 new ClosingChannelFutureListener(ctx, promise) :
526                 new ClosingChannelFutureListener(ctx, promise, gracefulShutdownTimeoutMillis, MILLISECONDS);
527     }
528 
529     private void doGracefulShutdown(ChannelHandlerContext ctx, ChannelFuture future, final ChannelPromise promise) {
530         final ChannelFutureListener listener = newClosingChannelFutureListener(ctx, promise);
531         if (isGracefulShutdownComplete()) {
532             // If there are no active streams, close immediately after the GO_AWAY write completes or the timeout
533             // elapsed.
534             future.addListener(listener);
535         } else {
536             // If there are active streams we should wait until they are all closed before closing the connection.
537 
538             // The ClosingChannelFutureListener will cascade promise completion. We need to always notify the
539             // new ClosingChannelFutureListener when the graceful close completes if the promise is not null.
540             if (closeListener == null) {
541                 closeListener = listener;
542             } else if (promise != null) {
543                 final ChannelFutureListener oldCloseListener = closeListener;
544                 closeListener = new ChannelFutureListener() {
545                     @Override
546                     public void operationComplete(ChannelFuture future) throws Exception {
547                         try {
548                             oldCloseListener.operationComplete(future);
549                         } finally {
550                             listener.operationComplete(future);
551                         }
552                     }
553                 };
554             }
555         }
556     }
557 
558     @Override
559     public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
560         ctx.deregister(promise);
561     }
562 
563     @Override
564     public void read(ChannelHandlerContext ctx) throws Exception {
565         ctx.read();
566     }
567 
568     @Override
569     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
570         ctx.write(msg, promise);
571     }
572 
573     @Override
574     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
575         // Trigger flush after read on the assumption that flush is cheap if there is nothing to write and that
576         // for flow-control the read may release window that causes data to be written that can now be flushed.
577         try {
578             // First call channelReadComplete0(...) as this may produce more data that we want to flush
579             channelReadComplete0(ctx);
580         } finally {
581             flush(ctx);
582         }
583     }
584 
585     final void channelReadComplete0(ChannelHandlerContext ctx) {
586         // Discard bytes of the cumulation buffer if needed.
587         discardSomeReadBytes();
588 
589         // Ensure we never stale the HTTP/2 Channel. Flow-control is enforced by HTTP/2.
590         //
591         // See https://tools.ietf.org/html/rfc7540#section-5.2.2
592         if (!ctx.channel().config().isAutoRead()) {
593             ctx.read();
594         }
595 
596         ctx.fireChannelReadComplete();
597     }
598 
599     /**
600      * Handles {@link Http2Exception} objects that were thrown from other handlers. Ignores all other exceptions.
601      */
602     @Override
603     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
604         if (getEmbeddedHttp2Exception(cause) != null) {
605             // Some exception in the causality chain is an Http2Exception - handle it.
606             onError(ctx, false, cause);
607         } else {
608             super.exceptionCaught(ctx, cause);
609         }
610     }
611 
612     /**
613      * Closes the local side of the given stream. If this causes the stream to be closed, adds a
614      * hook to close the channel after the given future completes.
615      *
616      * @param stream the stream to be half closed.
617      * @param future If closing, the future after which to close the channel.
618      */
619     @Override
620     public void closeStreamLocal(Http2Stream stream, ChannelFuture future) {
621         switch (stream.state()) {
622             case HALF_CLOSED_LOCAL:
623             case OPEN:
624                 stream.closeLocalSide();
625                 break;
626             default:
627                 closeStream(stream, future);
628                 break;
629         }
630     }
631 
632     /**
633      * Closes the remote side of the given stream. If this causes the stream to be closed, adds a
634      * hook to close the channel after the given future completes.
635      *
636      * @param stream the stream to be half closed.
637      * @param future If closing, the future after which to close the channel.
638      */
639     @Override
640     public void closeStreamRemote(Http2Stream stream, ChannelFuture future) {
641         switch (stream.state()) {
642             case HALF_CLOSED_REMOTE:
643             case OPEN:
644                 stream.closeRemoteSide();
645                 break;
646             default:
647                 closeStream(stream, future);
648                 break;
649         }
650     }
651 
652     @Override
653     public void closeStream(final Http2Stream stream, ChannelFuture future) {
654         if (future.isDone()) {
655             doCloseStream(stream, future);
656         } else {
657             future.addListener(new ChannelFutureListener() {
658                 @Override
659                 public void operationComplete(ChannelFuture future) {
660                     doCloseStream(stream, future);
661                 }
662             });
663         }
664     }
665 
666     /**
667      * Central handler for all exceptions caught during HTTP/2 processing.
668      */
669     @Override
670     public void onError(ChannelHandlerContext ctx, boolean outbound, Throwable cause) {
671         Http2Exception embedded = getEmbeddedHttp2Exception(cause);
672         if (isStreamError(embedded)) {
673             onStreamError(ctx, outbound, cause, (StreamException) embedded);
674         } else if (embedded instanceof CompositeStreamException) {
675             CompositeStreamException compositException = (CompositeStreamException) embedded;
676             for (StreamException streamException : compositException) {
677                 onStreamError(ctx, outbound, cause, streamException);
678             }
679         } else {
680             onConnectionError(ctx, outbound, cause, embedded);
681         }
682         ctx.flush();
683     }
684 
685     /**
686      * Called by the graceful shutdown logic to determine when it is safe to close the connection. Returns {@code true}
687      * if the graceful shutdown has completed and the connection can be safely closed. This implementation just
688      * guarantees that there are no active streams. Subclasses may override to provide additional checks.
689      */
690     protected boolean isGracefulShutdownComplete() {
691         return connection().numActiveStreams() == 0;
692     }
693 
694     /**
695      * Handler for a connection error. Sends a GO_AWAY frame to the remote endpoint. Once all
696      * streams are closed, the connection is shut down.
697      *
698      * @param ctx the channel context
699      * @param outbound {@code true} if the error was caused by an outbound operation.
700      * @param cause the exception that was caught
701      * @param http2Ex the {@link Http2Exception} that is embedded in the causality chain. This may
702      *            be {@code null} if it's an unknown exception.
703      */
704     protected void onConnectionError(ChannelHandlerContext ctx, boolean outbound,
705                                      Throwable cause, Http2Exception http2Ex) {
706         if (http2Ex == null) {
707             http2Ex = new Http2Exception(INTERNAL_ERROR, cause.getMessage(), cause);
708         }
709 
710         ChannelPromise promise = ctx.newPromise();
711         ChannelFuture future = goAway(ctx, http2Ex, ctx.newPromise());
712         if (http2Ex.shutdownHint() == Http2Exception.ShutdownHint.GRACEFUL_SHUTDOWN) {
713             doGracefulShutdown(ctx, future, promise);
714         } else {
715             future.addListener(newClosingChannelFutureListener(ctx, promise));
716         }
717     }
718 
719     /**
720      * Handler for a stream error. Sends a {@code RST_STREAM} frame to the remote endpoint and closes the
721      * stream.
722      *
723      * @param ctx the channel context
724      * @param outbound {@code true} if the error was caused by an outbound operation.
725      * @param cause the exception that was caught
726      * @param http2Ex the {@link StreamException} that is embedded in the causality chain.
727      */
728     protected void onStreamError(ChannelHandlerContext ctx, boolean outbound,
729                                  @SuppressWarnings("unused") Throwable cause, StreamException http2Ex) {
730         final int streamId = http2Ex.streamId();
731         Http2Stream stream = connection().stream(streamId);
732 
733         //if this is caused by reading headers that are too large, send a header with status 431
734         if (http2Ex instanceof Http2Exception.HeaderListSizeException &&
735             ((Http2Exception.HeaderListSizeException) http2Ex).duringDecode() &&
736             connection().isServer()) {
737 
738             // NOTE We have to check to make sure that a stream exists before we send our reply.
739             // We likely always create the stream below as the stream isn't created until the
740             // header block is completely processed.
741 
742             // The case of a streamId referring to a stream which was already closed is handled
743             // by createStream and will land us in the catch block below
744             if (stream == null) {
745                 try {
746                     stream = encoder.connection().remote().createStream(streamId, true);
747                 } catch (Http2Exception e) {
748                     encoder().writeRstStream(ctx, streamId, http2Ex.error().code(), ctx.newPromise());
749                     return;
750                 }
751             }
752 
753             // ensure that we have not already sent headers on this stream
754             if (stream != null && !stream.isHeadersSent()) {
755                 try {
756                     handleServerHeaderDecodeSizeError(ctx, stream);
757                 } catch (Throwable cause2) {
758                     onError(ctx, outbound, connectionError(INTERNAL_ERROR, cause2, "Error DecodeSizeError"));
759                 }
760             }
761         }
762 
763         if (stream == null) {
764             if (!outbound || connection().local().mayHaveCreatedStream(streamId)) {
765                 encoder().writeRstStream(ctx, streamId, http2Ex.error().code(), ctx.newPromise());
766             }
767         } else {
768             encoder().writeRstStream(ctx, streamId, http2Ex.error().code(), ctx.newPromise());
769         }
770     }
771 
772     /**
773      * Notifies client that this server has received headers that are larger than what it is
774      * willing to accept. Override to change behavior.
775      *
776      * @param ctx the channel context
777      * @param stream the Http2Stream on which the header was received
778      */
779     protected void handleServerHeaderDecodeSizeError(ChannelHandlerContext ctx, Http2Stream stream) {
780         encoder().writeHeaders(ctx, stream.id(), HEADERS_TOO_LARGE_HEADERS, 0, true, ctx.newPromise());
781     }
782 
783     protected Http2FrameWriter frameWriter() {
784         return encoder().frameWriter();
785     }
786 
787     /**
788      * Sends a {@code RST_STREAM} frame even if we don't know about the stream. This error condition is most likely
789      * triggered by the first frame of a stream being invalid. That is, there was an error reading the frame before
790      * we could create a new stream.
791      */
792     private ChannelFuture resetUnknownStream(final ChannelHandlerContext ctx, int streamId, long errorCode,
793                                              ChannelPromise promise) {
794         ChannelFuture future = frameWriter().writeRstStream(ctx, streamId, errorCode, promise);
795         if (future.isDone()) {
796             closeConnectionOnError(ctx, future);
797         } else {
798             future.addListener(new ChannelFutureListener() {
799                 @Override
800                 public void operationComplete(ChannelFuture future) throws Exception {
801                     closeConnectionOnError(ctx, future);
802                 }
803             });
804         }
805         return future;
806     }
807 
808     @Override
809     public ChannelFuture resetStream(final ChannelHandlerContext ctx, int streamId, long errorCode,
810                                      ChannelPromise promise) {
811         final Http2Stream stream = connection().stream(streamId);
812         if (stream == null) {
813             return resetUnknownStream(ctx, streamId, errorCode, promise.unvoid());
814         }
815 
816        return resetStream(ctx, stream, errorCode, promise);
817     }
818 
819     private ChannelFuture resetStream(final ChannelHandlerContext ctx, final Http2Stream stream,
820                                       long errorCode, ChannelPromise promise) {
821         promise = promise.unvoid();
822         if (stream.isResetSent()) {
823             // Don't write a RST_STREAM frame if we have already written one.
824             return promise.setSuccess();
825         }
826         // Synchronously set the resetSent flag to prevent any subsequent calls
827         // from resulting in multiple reset frames being sent.
828         //
829         // This needs to be done before we notify the promise as the promise may have a listener attached that
830         // call resetStream(...) again.
831         stream.resetSent();
832 
833         final ChannelFuture future;
834         // If the remote peer is not aware of the steam, then we are not allowed to send a RST_STREAM
835         // https://tools.ietf.org/html/rfc7540#section-6.4.
836         if (stream.state() == IDLE ||
837             connection().local().created(stream) && !stream.isHeadersSent() && !stream.isPushPromiseSent()) {
838             future = promise.setSuccess();
839         } else {
840             future = frameWriter().writeRstStream(ctx, stream.id(), errorCode, promise);
841         }
842         if (future.isDone()) {
843             processRstStreamWriteResult(ctx, stream, future);
844         } else {
845             future.addListener(new ChannelFutureListener() {
846                 @Override
847                 public void operationComplete(ChannelFuture future) throws Exception {
848                     processRstStreamWriteResult(ctx, stream, future);
849                 }
850             });
851         }
852 
853         return future;
854     }
855 
856     @Override
857     public ChannelFuture goAway(final ChannelHandlerContext ctx, final int lastStreamId, final long errorCode,
858                                 final ByteBuf debugData, ChannelPromise promise) {
859         promise = promise.unvoid();
860         final Http2Connection connection = connection();
861         try {
862             if (!connection.goAwaySent(lastStreamId, errorCode, debugData)) {
863                 debugData.release();
864                 promise.trySuccess();
865                 return promise;
866             }
867         } catch (Throwable cause) {
868             debugData.release();
869             promise.tryFailure(cause);
870             return promise;
871         }
872 
873         // Need to retain before we write the buffer because if we do it after the refCnt could already be 0 and
874         // result in an IllegalRefCountException.
875         debugData.retain();
876         ChannelFuture future = frameWriter().writeGoAway(ctx, lastStreamId, errorCode, debugData, promise);
877 
878         if (future.isDone()) {
879             processGoAwayWriteResult(ctx, lastStreamId, errorCode, debugData, future);
880         } else {
881             future.addListener(new ChannelFutureListener() {
882                 @Override
883                 public void operationComplete(ChannelFuture future) throws Exception {
884                     processGoAwayWriteResult(ctx, lastStreamId, errorCode, debugData, future);
885                 }
886             });
887         }
888 
889         return future;
890     }
891 
892     /**
893      * Closes the connection if the graceful shutdown process has completed.
894      * @param future Represents the status that will be passed to the {@link #closeListener}.
895      */
896     private void checkCloseConnection(ChannelFuture future) {
897         // If this connection is closing and the graceful shutdown has completed, close the connection
898         // once this operation completes.
899         if (closeListener != null && isGracefulShutdownComplete()) {
900             ChannelFutureListener closeListener = this.closeListener;
901             // This method could be called multiple times
902             // and we don't want to notify the closeListener multiple times.
903             this.closeListener = null;
904             try {
905                 closeListener.operationComplete(future);
906             } catch (Exception e) {
907                 throw new IllegalStateException("Close listener threw an unexpected exception", e);
908             }
909         }
910     }
911 
912     /**
913      * Close the remote endpoint with a {@code GO_AWAY} frame. Does <strong>not</strong> flush
914      * immediately, this is the responsibility of the caller.
915      */
916     private ChannelFuture goAway(ChannelHandlerContext ctx, Http2Exception cause, ChannelPromise promise) {
917         long errorCode = cause != null ? cause.error().code() : NO_ERROR.code();
918         int lastKnownStream;
919         if (cause != null && cause.shutdownHint() == Http2Exception.ShutdownHint.HARD_SHUTDOWN) {
920             // The hard shutdown could have been triggered during header processing, before updating
921             // lastStreamCreated(). Specifically, any connection errors encountered by Http2FrameReader or HPACK
922             // decoding will fail to update the last known stream. So we must be pessimistic.
923             // https://github.com/netty/netty/issues/10670
924             lastKnownStream = Integer.MAX_VALUE;
925         } else {
926             lastKnownStream = connection().remote().lastStreamCreated();
927         }
928         return goAway(ctx, lastKnownStream, errorCode, Http2CodecUtil.toByteBuf(ctx, cause), promise);
929     }
930 
931     private void processRstStreamWriteResult(ChannelHandlerContext ctx, Http2Stream stream, ChannelFuture future) {
932         if (future.isSuccess()) {
933             closeStream(stream, future);
934         } else {
935             // The connection will be closed and so no need to change the resetSent flag to false.
936             onConnectionError(ctx, true, future.cause(), null);
937         }
938     }
939 
940     private void closeConnectionOnError(ChannelHandlerContext ctx, ChannelFuture future) {
941         if (!future.isSuccess()) {
942             onConnectionError(ctx, true, future.cause(), null);
943         }
944     }
945 
946     private void doCloseStream(final Http2Stream stream, ChannelFuture future) {
947         stream.close();
948         checkCloseConnection(future);
949     }
950 
951     /**
952      * Returns the client preface string if this is a client connection, otherwise returns {@code null}.
953      */
954     private static ByteBuf clientPrefaceString(Http2Connection connection) {
955         return connection.isServer() ? connectionPrefaceBuf() : null;
956     }
957 
958     private static void processGoAwayWriteResult(final ChannelHandlerContext ctx, final int lastStreamId,
959                                                  final long errorCode, final ByteBuf debugData, ChannelFuture future) {
960         try {
961             if (future.isSuccess()) {
962                 if (errorCode != NO_ERROR.code()) {
963                     if (logger.isDebugEnabled()) {
964                         logger.debug("{} Sent GOAWAY: lastStreamId '{}', errorCode '{}', " +
965                                      "debugData '{}'. Forcing shutdown of the connection.",
966                                      ctx.channel(), lastStreamId, errorCode, debugData.toString(UTF_8), future.cause());
967                     }
968                     ctx.close();
969                 }
970             } else {
971                 if (logger.isDebugEnabled()) {
972                     logger.debug("{} Sending GOAWAY failed: lastStreamId '{}', errorCode '{}', " +
973                                  "debugData '{}'. Forcing shutdown of the connection.",
974                                  ctx.channel(), lastStreamId, errorCode, debugData.toString(UTF_8), future.cause());
975                 }
976                 ctx.close();
977             }
978         } finally {
979             // We're done with the debug data now.
980             debugData.release();
981         }
982     }
983 
984     /**
985      * Closes the channel when the future completes.
986      */
987     private static final class ClosingChannelFutureListener implements ChannelFutureListener {
988         private final ChannelHandlerContext ctx;
989         private final ChannelPromise promise;
990         private final Future<?> timeoutTask;
991         private boolean closed;
992 
993         ClosingChannelFutureListener(ChannelHandlerContext ctx, ChannelPromise promise) {
994             this.ctx = ctx;
995             this.promise = promise;
996             timeoutTask = null;
997         }
998 
999         ClosingChannelFutureListener(final ChannelHandlerContext ctx, final ChannelPromise promise,
1000                                      long timeout, TimeUnit unit) {
1001             this.ctx = ctx;
1002             this.promise = promise;
1003             timeoutTask = ctx.executor().schedule(new Runnable() {
1004                 @Override
1005                 public void run() {
1006                     doClose();
1007                 }
1008             }, timeout, unit);
1009         }
1010 
1011         @Override
1012         public void operationComplete(ChannelFuture sentGoAwayFuture) {
1013             if (timeoutTask != null) {
1014                 timeoutTask.cancel(false);
1015             }
1016             doClose();
1017         }
1018 
1019         private void doClose() {
1020             // We need to guard against multiple calls as the timeout may trigger close() first and then it will be
1021             // triggered again because of operationComplete(...) is called.
1022             if (closed) {
1023                 // This only happens if we also scheduled a timeout task.
1024                 assert timeoutTask != null;
1025                 return;
1026             }
1027             closed = true;
1028             if (promise == null) {
1029                 ctx.close();
1030             } else {
1031                 ctx.close(promise);
1032             }
1033         }
1034     }
1035 
1036     private final class PrefaceSendListener implements ChannelFutureListener {
1037         private final ChannelHandlerContext ctx;
1038         private final ChannelPromise promise;
1039 
1040         PrefaceSendListener(ChannelHandlerContext ctx, ChannelPromise promise) {
1041             this.ctx = ctx;
1042             this.promise = promise;
1043         }
1044 
1045         @Override
1046         public void operationComplete(ChannelFuture f) {
1047             if (f.isSuccess()) {
1048                 try {
1049                     if (byteDecoder != null) {
1050                         byteDecoder.sendPrefaceIfNeeded(ctx);
1051                     }
1052                 } catch (Throwable e) {
1053                     promise.setFailure(e);
1054                     return;
1055                 }
1056                 promise.setSuccess();
1057             } else {
1058                 promise.setFailure(f.cause());
1059             }
1060         }
1061     }
1062 }