1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package io.netty.handler.codec.http2;
16
17 import io.netty.buffer.ByteBuf;
18 import io.netty.buffer.ByteBufUtil;
19 import io.netty.buffer.Unpooled;
20 import io.netty.channel.ChannelFuture;
21 import io.netty.channel.ChannelFutureListener;
22 import io.netty.channel.ChannelHandlerContext;
23 import io.netty.channel.ChannelOutboundHandler;
24 import io.netty.channel.ChannelPromise;
25 import io.netty.handler.codec.ByteToMessageDecoder;
26 import io.netty.handler.codec.http.HttpResponseStatus;
27 import io.netty.handler.codec.http2.Http2Exception.CompositeStreamException;
28 import io.netty.handler.codec.http2.Http2Exception.StreamException;
29 import io.netty.util.CharsetUtil;
30 import io.netty.util.concurrent.Future;
31 import io.netty.util.internal.logging.InternalLogger;
32 import io.netty.util.internal.logging.InternalLoggerFactory;
33
34 import java.net.SocketAddress;
35 import java.util.List;
36 import java.util.concurrent.TimeUnit;
37
38 import static io.netty.buffer.ByteBufUtil.hexDump;
39 import static io.netty.buffer.Unpooled.EMPTY_BUFFER;
40 import static io.netty.handler.codec.http2.Http2CodecUtil.HTTP_UPGRADE_STREAM_ID;
41 import static io.netty.handler.codec.http2.Http2CodecUtil.connectionPrefaceBuf;
42 import static io.netty.handler.codec.http2.Http2CodecUtil.getEmbeddedHttp2Exception;
43 import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
44 import static io.netty.handler.codec.http2.Http2Error.NO_ERROR;
45 import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
46 import static io.netty.handler.codec.http2.Http2Exception.connectionError;
47 import static io.netty.handler.codec.http2.Http2Exception.isStreamError;
48 import static io.netty.handler.codec.http2.Http2FrameTypes.SETTINGS;
49 import static io.netty.handler.codec.http2.Http2Stream.State.IDLE;
50 import static io.netty.util.CharsetUtil.UTF_8;
51 import static io.netty.util.internal.ObjectUtil.checkNotNull;
52 import static java.lang.Math.min;
53 import static java.util.concurrent.TimeUnit.MILLISECONDS;
54
55
56
57
58
59
60
61
62
63
64 public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http2LifecycleManager,
65 ChannelOutboundHandler {
66
67 private static final InternalLogger logger = InternalLoggerFactory.getInstance(Http2ConnectionHandler.class);
68
69 private static final Http2Headers HEADERS_TOO_LARGE_HEADERS = ReadOnlyHttp2Headers.serverHeaders(false,
70 HttpResponseStatus.REQUEST_HEADER_FIELDS_TOO_LARGE.codeAsText());
71 private static final ByteBuf HTTP_1_X_BUF = Unpooled.unreleasableBuffer(
72 Unpooled.wrappedBuffer(new byte[] {'H', 'T', 'T', 'P', '/', '1', '.'})).asReadOnly();
73
74 private final Http2ConnectionDecoder decoder;
75 private final Http2ConnectionEncoder encoder;
76 private final Http2Settings initialSettings;
77 private final boolean decoupleCloseAndGoAway;
78 private final boolean flushPreface;
79 private ChannelFutureListener closeListener;
80 private BaseDecoder byteDecoder;
81 private long gracefulShutdownTimeoutMillis;
82
83 protected Http2ConnectionHandler(Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder,
84 Http2Settings initialSettings) {
85 this(decoder, encoder, initialSettings, false);
86 }
87
88 protected Http2ConnectionHandler(Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder,
89 Http2Settings initialSettings, boolean decoupleCloseAndGoAway) {
90 this(decoder, encoder, initialSettings, decoupleCloseAndGoAway, true);
91 }
92
93 protected Http2ConnectionHandler(Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder,
94 Http2Settings initialSettings, boolean decoupleCloseAndGoAway,
95 boolean flushPreface) {
96 this.initialSettings = checkNotNull(initialSettings, "initialSettings");
97 this.decoder = checkNotNull(decoder, "decoder");
98 this.encoder = checkNotNull(encoder, "encoder");
99 this.decoupleCloseAndGoAway = decoupleCloseAndGoAway;
100 this.flushPreface = flushPreface;
101 if (encoder.connection() != decoder.connection()) {
102 throw new IllegalArgumentException("Encoder and Decoder do not share the same connection object");
103 }
104 }
105
106
107
108
109
110
111 public long gracefulShutdownTimeoutMillis() {
112 return gracefulShutdownTimeoutMillis;
113 }
114
115
116
117
118
119
120
121 public void gracefulShutdownTimeoutMillis(long gracefulShutdownTimeoutMillis) {
122 if (gracefulShutdownTimeoutMillis < -1) {
123 throw new IllegalArgumentException("gracefulShutdownTimeoutMillis: " + gracefulShutdownTimeoutMillis +
124 " (expected: -1 for indefinite or >= 0)");
125 }
126 this.gracefulShutdownTimeoutMillis = gracefulShutdownTimeoutMillis;
127 }
128
129 public Http2Connection connection() {
130 return encoder.connection();
131 }
132
133 public Http2ConnectionDecoder decoder() {
134 return decoder;
135 }
136
137 public Http2ConnectionEncoder encoder() {
138 return encoder;
139 }
140
141 private boolean prefaceSent() {
142 return byteDecoder != null && byteDecoder.prefaceSent();
143 }
144
145
146
147
148
149 public void onHttpClientUpgrade() throws Http2Exception {
150 if (connection().isServer()) {
151 throw connectionError(PROTOCOL_ERROR, "Client-side HTTP upgrade requested for a server");
152 }
153 if (!prefaceSent()) {
154
155
156 throw connectionError(INTERNAL_ERROR, "HTTP upgrade must occur after preface was sent");
157 }
158 if (decoder.prefaceReceived()) {
159 throw connectionError(PROTOCOL_ERROR, "HTTP upgrade must occur before HTTP/2 preface is received");
160 }
161
162
163 connection().local().createStream(HTTP_UPGRADE_STREAM_ID, true);
164 }
165
166
167
168
169
170 public void onHttpServerUpgrade(Http2Settings settings) throws Http2Exception {
171 if (!connection().isServer()) {
172 throw connectionError(PROTOCOL_ERROR, "Server-side HTTP upgrade requested for a client");
173 }
174 if (!prefaceSent()) {
175
176
177 throw connectionError(INTERNAL_ERROR, "HTTP upgrade must occur after preface was sent");
178 }
179 if (decoder.prefaceReceived()) {
180 throw connectionError(PROTOCOL_ERROR, "HTTP upgrade must occur before HTTP/2 preface is received");
181 }
182
183
184 encoder.remoteSettings(settings);
185
186
187 connection().remote().createStream(HTTP_UPGRADE_STREAM_ID, true);
188 }
189
190 @Override
191 public void flush(ChannelHandlerContext ctx) {
192 try {
193
194 encoder.flowController().writePendingBytes();
195 ctx.flush();
196 } catch (Http2Exception e) {
197 onError(ctx, true, e);
198 } catch (Throwable cause) {
199 onError(ctx, true, connectionError(INTERNAL_ERROR, cause, "Error flushing"));
200 }
201 }
202
203 private abstract class BaseDecoder {
204 public abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;
205 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { }
206 public void channelActive(ChannelHandlerContext ctx) throws Exception { }
207
208 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
209
210 encoder().close();
211 decoder().close();
212
213
214
215 connection().close(ctx.voidPromise());
216 }
217
218
219
220
221 public boolean prefaceSent() {
222 return true;
223 }
224 }
225
226 private final class PrefaceDecoder extends BaseDecoder {
227 private ByteBuf clientPrefaceString;
228 private boolean prefaceSent;
229
230 PrefaceDecoder(ChannelHandlerContext ctx) throws Exception {
231 clientPrefaceString = clientPrefaceString(encoder.connection());
232
233
234 sendPreface(ctx);
235 }
236
237 @Override
238 public boolean prefaceSent() {
239 return prefaceSent;
240 }
241
242 @Override
243 public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
244 try {
245 if (ctx.channel().isActive() && readClientPrefaceString(in) && verifyFirstFrameIsSettings(in)) {
246
247 byteDecoder = new FrameDecoder();
248 byteDecoder.decode(ctx, in, out);
249 }
250 } catch (Throwable e) {
251 if (byteDecoder != null) {
252
253 in.skipBytes(in.readableBytes());
254 }
255 onError(ctx, false, e);
256 }
257 }
258
259 @Override
260 public void channelActive(ChannelHandlerContext ctx) throws Exception {
261
262 sendPreface(ctx);
263 }
264
265 @Override
266 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
267 cleanup();
268 super.channelInactive(ctx);
269 }
270
271
272
273
274 @Override
275 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
276 cleanup();
277 }
278
279
280
281
282 private void cleanup() {
283 if (clientPrefaceString != null) {
284 clientPrefaceString.release();
285 clientPrefaceString = null;
286 }
287 }
288
289
290
291
292
293
294
295 private boolean readClientPrefaceString(ByteBuf in) throws Http2Exception {
296 if (clientPrefaceString == null) {
297 return true;
298 }
299
300 int prefaceRemaining = clientPrefaceString.readableBytes();
301 int bytesRead = min(in.readableBytes(), prefaceRemaining);
302
303
304 if (bytesRead == 0 || !ByteBufUtil.equals(in, in.readerIndex(),
305 clientPrefaceString, clientPrefaceString.readerIndex(),
306 bytesRead)) {
307 int maxSearch = 1024;
308 int http1Index =
309 ByteBufUtil.indexOf(HTTP_1_X_BUF, in.slice(in.readerIndex(), min(in.readableBytes(), maxSearch)));
310 if (http1Index != -1) {
311 String chunk = in.toString(in.readerIndex(), http1Index - in.readerIndex(), CharsetUtil.US_ASCII);
312 throw connectionError(PROTOCOL_ERROR, "Unexpected HTTP/1.x request: %s", chunk);
313 }
314 String receivedBytes = hexDump(in, in.readerIndex(),
315 min(in.readableBytes(), clientPrefaceString.readableBytes()));
316 throw connectionError(PROTOCOL_ERROR, "HTTP/2 client preface string missing or corrupt. " +
317 "Hex dump for received bytes: %s", receivedBytes);
318 }
319 in.skipBytes(bytesRead);
320 clientPrefaceString.skipBytes(bytesRead);
321
322 if (!clientPrefaceString.isReadable()) {
323
324 clientPrefaceString.release();
325 clientPrefaceString = null;
326 return true;
327 }
328 return false;
329 }
330
331
332
333
334
335
336
337
338
339 private boolean verifyFirstFrameIsSettings(ByteBuf in) throws Http2Exception {
340 if (in.readableBytes() < 5) {
341
342 return false;
343 }
344
345 short frameType = in.getUnsignedByte(in.readerIndex() + 3);
346 if (frameType != SETTINGS) {
347 throw connectionError(PROTOCOL_ERROR, "First received frame was not SETTINGS. " +
348 "Hex dump for first 5 bytes: %s",
349 hexDump(in, in.readerIndex(), 5));
350 }
351 short flags = in.getUnsignedByte(in.readerIndex() + 4);
352 if ((flags & Http2Flags.ACK) != 0) {
353 throw connectionError(PROTOCOL_ERROR, "First received frame was SETTINGS frame but had ACK flag set. " +
354 "Hex dump for first 5 bytes: %s",
355 hexDump(in, in.readerIndex(), 5));
356 }
357 return true;
358 }
359
360
361
362
363 private void sendPreface(ChannelHandlerContext ctx) throws Exception {
364 if (prefaceSent || !ctx.channel().isActive()) {
365 return;
366 }
367
368 prefaceSent = true;
369
370 final boolean isClient = !connection().isServer();
371 if (isClient) {
372
373 ctx.write(connectionPrefaceBuf()).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
374 }
375
376
377 encoder.writeSettings(ctx, initialSettings, ctx.newPromise()).addListener(
378 ChannelFutureListener.CLOSE_ON_FAILURE);
379
380 try {
381 if (isClient) {
382
383
384
385 userEventTriggered(ctx, Http2ConnectionPrefaceAndSettingsFrameWrittenEvent.INSTANCE);
386 }
387 } finally {
388 if (flushPreface) {
389
390
391
392 ctx.flush();
393 }
394 }
395 }
396 }
397
398 private final class FrameDecoder extends BaseDecoder {
399 @Override
400 public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
401 try {
402 decoder.decodeFrame(ctx, in, out);
403 } catch (Throwable e) {
404 onError(ctx, false, e);
405 }
406 }
407 }
408
409 @Override
410 public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
411
412 encoder.lifecycleManager(this);
413 decoder.lifecycleManager(this);
414 encoder.flowController().channelHandlerContext(ctx);
415 decoder.flowController().channelHandlerContext(ctx);
416 byteDecoder = new PrefaceDecoder(ctx);
417 }
418
419 @Override
420 protected void handlerRemoved0(ChannelHandlerContext ctx) throws Exception {
421 if (byteDecoder != null) {
422 byteDecoder.handlerRemoved(ctx);
423 byteDecoder = null;
424 }
425 }
426
427 @Override
428 public void channelActive(ChannelHandlerContext ctx) throws Exception {
429 if (byteDecoder == null) {
430 byteDecoder = new PrefaceDecoder(ctx);
431 }
432 byteDecoder.channelActive(ctx);
433 super.channelActive(ctx);
434 }
435
436 @Override
437 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
438
439 super.channelInactive(ctx);
440 if (byteDecoder != null) {
441 byteDecoder.channelInactive(ctx);
442 byteDecoder = null;
443 }
444 }
445
446 @Override
447 public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
448
449
450 try {
451 if (ctx.channel().isWritable()) {
452 flush(ctx);
453 }
454 encoder.flowController().channelWritabilityChanged();
455 } finally {
456 super.channelWritabilityChanged(ctx);
457 }
458 }
459
460 @Override
461 protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
462 byteDecoder.decode(ctx, in, out);
463 }
464
465 @Override
466 public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
467 ctx.bind(localAddress, promise);
468 }
469
470 @Override
471 public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress,
472 ChannelPromise promise) throws Exception {
473 ctx.connect(remoteAddress, localAddress, promise);
474 }
475
476 @Override
477 public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
478 ctx.disconnect(promise);
479 }
480
481 @Override
482 public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
483 if (decoupleCloseAndGoAway) {
484 ctx.close(promise);
485 return;
486 }
487 promise = promise.unvoid();
488
489 if (!ctx.channel().isActive() || !prefaceSent()) {
490 ctx.close(promise);
491 return;
492 }
493
494
495
496
497
498
499 ChannelFuture f = connection().goAwaySent() ? ctx.write(EMPTY_BUFFER) : goAway(ctx, null, ctx.newPromise());
500 ctx.flush();
501 doGracefulShutdown(ctx, f, promise);
502 }
503
504 private ChannelFutureListener newClosingChannelFutureListener(
505 ChannelHandlerContext ctx, ChannelPromise promise) {
506 long gracefulShutdownTimeoutMillis = this.gracefulShutdownTimeoutMillis;
507 return gracefulShutdownTimeoutMillis < 0 ?
508 new ClosingChannelFutureListener(ctx, promise) :
509 new ClosingChannelFutureListener(ctx, promise, gracefulShutdownTimeoutMillis, MILLISECONDS);
510 }
511
512 private void doGracefulShutdown(ChannelHandlerContext ctx, ChannelFuture future, final ChannelPromise promise) {
513 final ChannelFutureListener listener = newClosingChannelFutureListener(ctx, promise);
514 if (isGracefulShutdownComplete()) {
515
516
517 future.addListener(listener);
518 } else {
519
520
521
522
523 if (closeListener == null) {
524 closeListener = listener;
525 } else if (promise != null) {
526 final ChannelFutureListener oldCloseListener = closeListener;
527 closeListener = future1 -> {
528 try {
529 oldCloseListener.operationComplete(future1);
530 } finally {
531 listener.operationComplete(future1);
532 }
533 };
534 }
535 }
536 }
537
538 @Override
539 public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
540 ctx.deregister(promise);
541 }
542
543 @Override
544 public void read(ChannelHandlerContext ctx) throws Exception {
545 ctx.read();
546 }
547
548 @Override
549 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
550 ctx.write(msg, promise);
551 }
552
553 @Override
554 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
555
556
557 try {
558
559 channelReadComplete0(ctx);
560 } finally {
561 flush(ctx);
562 }
563 }
564
565 final void channelReadComplete0(ChannelHandlerContext ctx) {
566
567 discardSomeReadBytes();
568
569
570
571
572 if (!ctx.channel().config().isAutoRead()) {
573 ctx.read();
574 }
575
576 ctx.fireChannelReadComplete();
577 }
578
579
580
581
582 @Override
583 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
584 if (getEmbeddedHttp2Exception(cause) != null) {
585
586 onError(ctx, false, cause);
587 } else {
588 super.exceptionCaught(ctx, cause);
589 }
590 }
591
592
593
594
595
596
597
598
599 @Override
600 public void closeStreamLocal(Http2Stream stream, ChannelFuture future) {
601 switch (stream.state()) {
602 case HALF_CLOSED_LOCAL:
603 case OPEN:
604 stream.closeLocalSide();
605 break;
606 default:
607 closeStream(stream, future);
608 break;
609 }
610 }
611
612
613
614
615
616
617
618
619 @Override
620 public void closeStreamRemote(Http2Stream stream, ChannelFuture future) {
621 switch (stream.state()) {
622 case HALF_CLOSED_REMOTE:
623 case OPEN:
624 stream.closeRemoteSide();
625 break;
626 default:
627 closeStream(stream, future);
628 break;
629 }
630 }
631
632 @Override
633 public void closeStream(final Http2Stream stream, ChannelFuture future) {
634 if (future.isDone()) {
635 doCloseStream(stream, future);
636 } else {
637 future.addListener((ChannelFutureListener) future1 -> doCloseStream(stream, future1));
638 }
639 }
640
641
642
643
644 @Override
645 public void onError(ChannelHandlerContext ctx, boolean outbound, Throwable cause) {
646 Http2Exception embedded = getEmbeddedHttp2Exception(cause);
647 if (isStreamError(embedded)) {
648 onStreamError(ctx, outbound, cause, (StreamException) embedded);
649 } else if (embedded instanceof CompositeStreamException) {
650 CompositeStreamException compositException = (CompositeStreamException) embedded;
651 for (StreamException streamException : compositException) {
652 onStreamError(ctx, outbound, cause, streamException);
653 }
654 } else {
655 onConnectionError(ctx, outbound, cause, embedded);
656 }
657 ctx.flush();
658 }
659
660
661
662
663
664
665 protected boolean isGracefulShutdownComplete() {
666 return connection().numActiveStreams() == 0;
667 }
668
669
670
671
672
673
674
675
676
677
678
679 protected void onConnectionError(ChannelHandlerContext ctx, boolean outbound,
680 Throwable cause, Http2Exception http2Ex) {
681 if (http2Ex == null) {
682 http2Ex = new Http2Exception(INTERNAL_ERROR, cause.getMessage(), cause);
683 }
684
685 ChannelPromise promise = ctx.newPromise();
686 ChannelFuture future = goAway(ctx, http2Ex, ctx.newPromise());
687 if (http2Ex.shutdownHint() == Http2Exception.ShutdownHint.GRACEFUL_SHUTDOWN) {
688 doGracefulShutdown(ctx, future, promise);
689 } else {
690 future.addListener(newClosingChannelFutureListener(ctx, promise));
691 }
692 }
693
694
695
696
697
698
699
700
701
702
703 protected void onStreamError(ChannelHandlerContext ctx, boolean outbound,
704 @SuppressWarnings("unused") Throwable cause, StreamException http2Ex) {
705 final int streamId = http2Ex.streamId();
706 Http2Stream stream = connection().stream(streamId);
707
708
709 if (http2Ex instanceof Http2Exception.HeaderListSizeException &&
710 ((Http2Exception.HeaderListSizeException) http2Ex).duringDecode() &&
711 connection().isServer()) {
712
713
714
715
716
717
718
719 if (stream == null) {
720 try {
721 stream = encoder.connection().remote().createStream(streamId, true);
722 } catch (Http2Exception e) {
723 encoder().writeRstStream(ctx, streamId, http2Ex.error().code(), ctx.newPromise());
724 return;
725 }
726 }
727
728
729 if (stream != null && !stream.isHeadersSent()) {
730 try {
731 handleServerHeaderDecodeSizeError(ctx, stream);
732 } catch (Throwable cause2) {
733 onError(ctx, outbound, connectionError(INTERNAL_ERROR, cause2, "Error DecodeSizeError"));
734 }
735 }
736 }
737
738 if (stream == null) {
739 if (!outbound || connection().local().mayHaveCreatedStream(streamId)) {
740 encoder().writeRstStream(ctx, streamId, http2Ex.error().code(), ctx.newPromise());
741 }
742 } else {
743 encoder().writeRstStream(ctx, streamId, http2Ex.error().code(), ctx.newPromise());
744 }
745 }
746
747
748
749
750
751
752
753
754 protected void handleServerHeaderDecodeSizeError(ChannelHandlerContext ctx, Http2Stream stream) {
755 encoder().writeHeaders(ctx, stream.id(), HEADERS_TOO_LARGE_HEADERS, 0, true, ctx.newPromise());
756 }
757
758 protected Http2FrameWriter frameWriter() {
759 return encoder().frameWriter();
760 }
761
762
763
764
765
766
767 private ChannelFuture resetUnknownStream(final ChannelHandlerContext ctx, int streamId, long errorCode,
768 ChannelPromise promise) {
769 ChannelFuture future = frameWriter().writeRstStream(ctx, streamId, errorCode, promise);
770 if (future.isDone()) {
771 closeConnectionOnError(ctx, future);
772 } else {
773 future.addListener((ChannelFutureListener) f -> closeConnectionOnError(ctx, f));
774 }
775 return future;
776 }
777
778 @Override
779 public ChannelFuture resetStream(final ChannelHandlerContext ctx, int streamId, long errorCode,
780 ChannelPromise promise) {
781 final Http2Stream stream = connection().stream(streamId);
782 if (stream == null) {
783 return resetUnknownStream(ctx, streamId, errorCode, promise.unvoid());
784 }
785
786 return resetStream(ctx, stream, errorCode, promise);
787 }
788
789 private ChannelFuture resetStream(final ChannelHandlerContext ctx, final Http2Stream stream,
790 long errorCode, ChannelPromise promise) {
791 promise = promise.unvoid();
792 if (stream.isResetSent()) {
793
794 return promise.setSuccess();
795 }
796
797
798
799
800
801 stream.resetSent();
802
803 final ChannelFuture future;
804
805
806 if (stream.state() == IDLE ||
807 connection().local().created(stream) && !stream.isHeadersSent() && !stream.isPushPromiseSent()) {
808 future = promise.setSuccess();
809 } else {
810 future = frameWriter().writeRstStream(ctx, stream.id(), errorCode, promise);
811 }
812 if (future.isDone()) {
813 processRstStreamWriteResult(ctx, stream, future);
814 } else {
815 future.addListener((ChannelFutureListener) f -> processRstStreamWriteResult(ctx, stream, f));
816 }
817
818 return future;
819 }
820
821 @Override
822 public ChannelFuture goAway(final ChannelHandlerContext ctx, final int lastStreamId, final long errorCode,
823 final ByteBuf debugData, ChannelPromise promise) {
824 promise = promise.unvoid();
825 final Http2Connection connection = connection();
826 try {
827 if (!connection.goAwaySent(lastStreamId, errorCode, debugData)) {
828 debugData.release();
829 promise.trySuccess();
830 return promise;
831 }
832 } catch (Throwable cause) {
833 debugData.release();
834 promise.tryFailure(cause);
835 return promise;
836 }
837
838
839
840 debugData.retain();
841 ChannelFuture future = frameWriter().writeGoAway(ctx, lastStreamId, errorCode, debugData, promise);
842
843 if (future.isDone()) {
844 processGoAwayWriteResult(ctx, lastStreamId, errorCode, debugData, future);
845 } else {
846 future.addListener((ChannelFutureListener) f ->
847 processGoAwayWriteResult(ctx, lastStreamId, errorCode, debugData, f));
848 }
849
850 return future;
851 }
852
853
854
855
856
857 private void checkCloseConnection(ChannelFuture future) {
858
859
860 if (closeListener != null && isGracefulShutdownComplete()) {
861 ChannelFutureListener closeListener = this.closeListener;
862
863
864 this.closeListener = null;
865 try {
866 closeListener.operationComplete(future);
867 } catch (Exception e) {
868 throw new IllegalStateException("Close listener threw an unexpected exception", e);
869 }
870 }
871 }
872
873
874
875
876
877 private ChannelFuture goAway(ChannelHandlerContext ctx, Http2Exception cause, ChannelPromise promise) {
878 long errorCode = cause != null ? cause.error().code() : NO_ERROR.code();
879 int lastKnownStream;
880 if (cause != null && cause.shutdownHint() == Http2Exception.ShutdownHint.HARD_SHUTDOWN) {
881
882
883
884
885 lastKnownStream = Integer.MAX_VALUE;
886 } else {
887 lastKnownStream = connection().remote().lastStreamCreated();
888 }
889 return goAway(ctx, lastKnownStream, errorCode, Http2CodecUtil.toByteBuf(ctx, cause), promise);
890 }
891
892 private void processRstStreamWriteResult(ChannelHandlerContext ctx, Http2Stream stream, ChannelFuture future) {
893 if (future.isSuccess()) {
894 closeStream(stream, future);
895 } else {
896
897 onConnectionError(ctx, true, future.cause(), null);
898 }
899 }
900
901 private void closeConnectionOnError(ChannelHandlerContext ctx, ChannelFuture future) {
902 if (!future.isSuccess()) {
903 onConnectionError(ctx, true, future.cause(), null);
904 }
905 }
906
907 private void doCloseStream(final Http2Stream stream, ChannelFuture future) {
908 stream.close();
909 checkCloseConnection(future);
910 }
911
912
913
914
915 private static ByteBuf clientPrefaceString(Http2Connection connection) {
916 return connection.isServer() ? connectionPrefaceBuf() : null;
917 }
918
919 private static void processGoAwayWriteResult(final ChannelHandlerContext ctx, final int lastStreamId,
920 final long errorCode, final ByteBuf debugData, ChannelFuture future) {
921 try {
922 if (future.isSuccess()) {
923 if (errorCode != NO_ERROR.code()) {
924 if (logger.isDebugEnabled()) {
925 logger.debug("{} Sent GOAWAY: lastStreamId '{}', errorCode '{}', " +
926 "debugData '{}'. Forcing shutdown of the connection.",
927 ctx.channel(), lastStreamId, errorCode, debugData.toString(UTF_8));
928 }
929 ctx.close();
930 }
931 } else {
932 if (logger.isDebugEnabled()) {
933 logger.debug("{} Sending GOAWAY failed: lastStreamId '{}', errorCode '{}', " +
934 "debugData '{}'. Forcing shutdown of the connection.",
935 ctx.channel(), lastStreamId, errorCode, debugData.toString(UTF_8), future.cause());
936 }
937 ctx.close();
938 }
939 } finally {
940
941 debugData.release();
942 }
943 }
944
945
946
947
948 private static final class ClosingChannelFutureListener implements ChannelFutureListener {
949 private final ChannelHandlerContext ctx;
950 private final ChannelPromise promise;
951 private final Future<?> timeoutTask;
952 private boolean closed;
953
954 ClosingChannelFutureListener(ChannelHandlerContext ctx, ChannelPromise promise) {
955 this.ctx = ctx;
956 this.promise = promise;
957 timeoutTask = null;
958 }
959
960 ClosingChannelFutureListener(final ChannelHandlerContext ctx, final ChannelPromise promise,
961 long timeout, TimeUnit unit) {
962 this.ctx = ctx;
963 this.promise = promise;
964 timeoutTask = ctx.executor().schedule(new Runnable() {
965 @Override
966 public void run() {
967 doClose();
968 }
969 }, timeout, unit);
970 }
971
972 @Override
973 public void operationComplete(ChannelFuture sentGoAwayFuture) {
974 if (timeoutTask != null) {
975 timeoutTask.cancel(false);
976 }
977 doClose();
978 }
979
980 private void doClose() {
981
982
983 if (closed) {
984
985 assert timeoutTask != null;
986 return;
987 }
988 closed = true;
989 if (promise == null) {
990 ctx.close();
991 } else {
992 ctx.close(promise);
993 }
994 }
995 }
996 }