1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package io.netty.handler.codec.http2;
16
17 import io.netty.buffer.ByteBuf;
18 import io.netty.buffer.ByteBufUtil;
19 import io.netty.buffer.Unpooled;
20 import io.netty.channel.ChannelFuture;
21 import io.netty.channel.ChannelFutureListener;
22 import io.netty.channel.ChannelHandlerContext;
23 import io.netty.channel.ChannelOutboundHandler;
24 import io.netty.channel.ChannelPromise;
25 import io.netty.handler.codec.ByteToMessageDecoder;
26 import io.netty.handler.codec.http.HttpResponseStatus;
27 import io.netty.handler.codec.http2.Http2Exception.CompositeStreamException;
28 import io.netty.handler.codec.http2.Http2Exception.StreamException;
29 import io.netty.util.CharsetUtil;
30 import io.netty.util.concurrent.Future;
31 import io.netty.util.internal.logging.InternalLogger;
32 import io.netty.util.internal.logging.InternalLoggerFactory;
33
34 import java.net.SocketAddress;
35 import java.util.List;
36 import java.util.concurrent.TimeUnit;
37
38 import static io.netty.buffer.ByteBufUtil.hexDump;
39 import static io.netty.buffer.Unpooled.EMPTY_BUFFER;
40 import static io.netty.handler.codec.http2.Http2CodecUtil.HTTP_UPGRADE_STREAM_ID;
41 import static io.netty.handler.codec.http2.Http2CodecUtil.connectionPrefaceBuf;
42 import static io.netty.handler.codec.http2.Http2CodecUtil.getEmbeddedHttp2Exception;
43 import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
44 import static io.netty.handler.codec.http2.Http2Error.NO_ERROR;
45 import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
46 import static io.netty.handler.codec.http2.Http2Exception.connectionError;
47 import static io.netty.handler.codec.http2.Http2Exception.isStreamError;
48 import static io.netty.handler.codec.http2.Http2FrameTypes.SETTINGS;
49 import static io.netty.handler.codec.http2.Http2Stream.State.IDLE;
50 import static io.netty.util.CharsetUtil.UTF_8;
51 import static io.netty.util.internal.ObjectUtil.checkNotNull;
52 import static java.lang.Math.min;
53 import static java.util.concurrent.TimeUnit.MILLISECONDS;
54
55
56
57
58
59
60
61
62
63
64 public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http2LifecycleManager,
65 ChannelOutboundHandler {
66
67 private static final InternalLogger logger = InternalLoggerFactory.getInstance(Http2ConnectionHandler.class);
68
69 private static final Http2Headers HEADERS_TOO_LARGE_HEADERS = ReadOnlyHttp2Headers.serverHeaders(false,
70 HttpResponseStatus.REQUEST_HEADER_FIELDS_TOO_LARGE.codeAsText());
71 private static final ByteBuf HTTP_1_X_BUF = Unpooled.unreleasableBuffer(
72 Unpooled.wrappedBuffer(new byte[] {'H', 'T', 'T', 'P', '/', '1', '.'})).asReadOnly();
73
74 private final Http2ConnectionDecoder decoder;
75 private final Http2ConnectionEncoder encoder;
76 private final Http2Settings initialSettings;
77 private final boolean decoupleCloseAndGoAway;
78 private final boolean flushPreface;
79 private ChannelFutureListener closeListener;
80 private BaseDecoder byteDecoder;
81 private long gracefulShutdownTimeoutMillis;
82
83 protected Http2ConnectionHandler(Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder,
84 Http2Settings initialSettings) {
85 this(decoder, encoder, initialSettings, false);
86 }
87
88 protected Http2ConnectionHandler(Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder,
89 Http2Settings initialSettings, boolean decoupleCloseAndGoAway) {
90 this(decoder, encoder, initialSettings, decoupleCloseAndGoAway, true);
91 }
92
93 protected Http2ConnectionHandler(Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder,
94 Http2Settings initialSettings, boolean decoupleCloseAndGoAway,
95 boolean flushPreface) {
96 this.initialSettings = checkNotNull(initialSettings, "initialSettings");
97 this.decoder = checkNotNull(decoder, "decoder");
98 this.encoder = checkNotNull(encoder, "encoder");
99 this.decoupleCloseAndGoAway = decoupleCloseAndGoAway;
100 this.flushPreface = flushPreface;
101 if (encoder.connection() != decoder.connection()) {
102 throw new IllegalArgumentException("Encoder and Decoder do not share the same connection object");
103 }
104 }
105
106
107
108
109
110
111 public long gracefulShutdownTimeoutMillis() {
112 return gracefulShutdownTimeoutMillis;
113 }
114
115
116
117
118
119
120
121 public void gracefulShutdownTimeoutMillis(long gracefulShutdownTimeoutMillis) {
122 if (gracefulShutdownTimeoutMillis < -1) {
123 throw new IllegalArgumentException("gracefulShutdownTimeoutMillis: " + gracefulShutdownTimeoutMillis +
124 " (expected: -1 for indefinite or >= 0)");
125 }
126 this.gracefulShutdownTimeoutMillis = gracefulShutdownTimeoutMillis;
127 }
128
129 public Http2Connection connection() {
130 return encoder.connection();
131 }
132
133 public Http2ConnectionDecoder decoder() {
134 return decoder;
135 }
136
137 public Http2ConnectionEncoder encoder() {
138 return encoder;
139 }
140
141 private boolean prefaceSent() {
142 return byteDecoder != null && byteDecoder.prefaceSent();
143 }
144
145
146
147
148
149 public void onHttpClientUpgrade() throws Http2Exception {
150 if (connection().isServer()) {
151 throw connectionError(PROTOCOL_ERROR, "Client-side HTTP upgrade requested for a server");
152 }
153 if (!prefaceSent()) {
154
155
156 throw connectionError(INTERNAL_ERROR, "HTTP upgrade must occur after preface was sent");
157 }
158 if (decoder.prefaceReceived()) {
159 throw connectionError(PROTOCOL_ERROR, "HTTP upgrade must occur before HTTP/2 preface is received");
160 }
161
162
163 connection().local().createStream(HTTP_UPGRADE_STREAM_ID, true);
164 }
165
166
167
168
169
170 public void onHttpServerUpgrade(Http2Settings settings) throws Http2Exception {
171 if (!connection().isServer()) {
172 throw connectionError(PROTOCOL_ERROR, "Server-side HTTP upgrade requested for a client");
173 }
174 if (!prefaceSent()) {
175
176
177 throw connectionError(INTERNAL_ERROR, "HTTP upgrade must occur after preface was sent");
178 }
179 if (decoder.prefaceReceived()) {
180 throw connectionError(PROTOCOL_ERROR, "HTTP upgrade must occur before HTTP/2 preface is received");
181 }
182
183
184 encoder.remoteSettings(settings);
185
186
187 connection().remote().createStream(HTTP_UPGRADE_STREAM_ID, true);
188 }
189
190 @Override
191 public void flush(ChannelHandlerContext ctx) {
192 try {
193
194 encoder.flowController().writePendingBytes();
195 ctx.flush();
196 } catch (Http2Exception e) {
197 onError(ctx, true, e);
198 } catch (Throwable cause) {
199 onError(ctx, true, connectionError(INTERNAL_ERROR, cause, "Error flushing"));
200 }
201 }
202
203 private abstract class BaseDecoder {
204 public abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;
205 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { }
206 public void channelActive(ChannelHandlerContext ctx) throws Exception { }
207
208 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
209
210 encoder().close();
211 decoder().close();
212
213
214
215 connection().close(ctx.voidPromise());
216 }
217
218
219
220
221 public boolean prefaceSent() {
222 return true;
223 }
224 }
225
226 private final class PrefaceDecoder extends BaseDecoder {
227 private ByteBuf clientPrefaceString;
228 private boolean prefaceSent;
229
230 PrefaceDecoder(ChannelHandlerContext ctx) throws Exception {
231 clientPrefaceString = clientPrefaceString(encoder.connection());
232
233
234 sendPreface(ctx);
235 }
236
237 @Override
238 public boolean prefaceSent() {
239 return prefaceSent;
240 }
241
242 @Override
243 public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
244 try {
245 if (ctx.channel().isActive() && readClientPrefaceString(in) && verifyFirstFrameIsSettings(in)) {
246
247 byteDecoder = new FrameDecoder();
248 byteDecoder.decode(ctx, in, out);
249 }
250 } catch (Throwable e) {
251 if (byteDecoder != null) {
252
253 in.skipBytes(in.readableBytes());
254 }
255 onError(ctx, false, e);
256 }
257 }
258
259 @Override
260 public void channelActive(ChannelHandlerContext ctx) throws Exception {
261
262 sendPreface(ctx);
263
264 if (flushPreface) {
265
266
267
268 ctx.flush();
269 }
270 }
271
272 @Override
273 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
274 cleanup();
275 super.channelInactive(ctx);
276 }
277
278
279
280
281 @Override
282 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
283 cleanup();
284 }
285
286
287
288
289 private void cleanup() {
290 if (clientPrefaceString != null) {
291 clientPrefaceString.release();
292 clientPrefaceString = null;
293 }
294 }
295
296
297
298
299
300
301
302 private boolean readClientPrefaceString(ByteBuf in) throws Http2Exception {
303 if (clientPrefaceString == null) {
304 return true;
305 }
306
307 int prefaceRemaining = clientPrefaceString.readableBytes();
308 int bytesRead = min(in.readableBytes(), prefaceRemaining);
309
310
311 if (bytesRead == 0 || !ByteBufUtil.equals(in, in.readerIndex(),
312 clientPrefaceString, clientPrefaceString.readerIndex(),
313 bytesRead)) {
314 int maxSearch = 1024;
315 int http1Index =
316 ByteBufUtil.indexOf(HTTP_1_X_BUF, in.slice(in.readerIndex(), min(in.readableBytes(), maxSearch)));
317 if (http1Index != -1) {
318 String chunk = in.toString(in.readerIndex(), http1Index - in.readerIndex(), CharsetUtil.US_ASCII);
319 throw connectionError(PROTOCOL_ERROR, "Unexpected HTTP/1.x request: %s", chunk);
320 }
321 String receivedBytes = hexDump(in, in.readerIndex(),
322 min(in.readableBytes(), clientPrefaceString.readableBytes()));
323 throw connectionError(PROTOCOL_ERROR, "HTTP/2 client preface string missing or corrupt. " +
324 "Hex dump for received bytes: %s", receivedBytes);
325 }
326 in.skipBytes(bytesRead);
327 clientPrefaceString.skipBytes(bytesRead);
328
329 if (!clientPrefaceString.isReadable()) {
330
331 clientPrefaceString.release();
332 clientPrefaceString = null;
333 return true;
334 }
335 return false;
336 }
337
338
339
340
341
342
343
344
345
346 private boolean verifyFirstFrameIsSettings(ByteBuf in) throws Http2Exception {
347 if (in.readableBytes() < 5) {
348
349 return false;
350 }
351
352 short frameType = in.getUnsignedByte(in.readerIndex() + 3);
353 if (frameType != SETTINGS) {
354 throw connectionError(PROTOCOL_ERROR, "First received frame was not SETTINGS. " +
355 "Hex dump for first 5 bytes: %s",
356 hexDump(in, in.readerIndex(), 5));
357 }
358 short flags = in.getUnsignedByte(in.readerIndex() + 4);
359 if ((flags & Http2Flags.ACK) != 0) {
360 throw connectionError(PROTOCOL_ERROR, "First received frame was SETTINGS frame but had ACK flag set. " +
361 "Hex dump for first 5 bytes: %s",
362 hexDump(in, in.readerIndex(), 5));
363 }
364 return true;
365 }
366
367
368
369
370 private void sendPreface(ChannelHandlerContext ctx) throws Exception {
371 if (prefaceSent || !ctx.channel().isActive()) {
372 return;
373 }
374
375 prefaceSent = true;
376
377 final boolean isClient = !connection().isServer();
378 if (isClient) {
379
380 ctx.write(connectionPrefaceBuf()).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
381 }
382
383
384 encoder.writeSettings(ctx, initialSettings, ctx.newPromise()).addListener(
385 ChannelFutureListener.CLOSE_ON_FAILURE);
386
387 if (isClient) {
388
389
390
391 userEventTriggered(ctx, Http2ConnectionPrefaceAndSettingsFrameWrittenEvent.INSTANCE);
392 }
393 }
394 }
395
396 private final class FrameDecoder extends BaseDecoder {
397 @Override
398 public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
399 try {
400 decoder.decodeFrame(ctx, in, out);
401 } catch (Throwable e) {
402 onError(ctx, false, e);
403 }
404 }
405 }
406
407 @Override
408 public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
409
410 encoder.lifecycleManager(this);
411 decoder.lifecycleManager(this);
412 encoder.flowController().channelHandlerContext(ctx);
413 decoder.flowController().channelHandlerContext(ctx);
414 byteDecoder = new PrefaceDecoder(ctx);
415 }
416
417 @Override
418 protected void handlerRemoved0(ChannelHandlerContext ctx) throws Exception {
419 if (byteDecoder != null) {
420 byteDecoder.handlerRemoved(ctx);
421 byteDecoder = null;
422 }
423 }
424
425 @Override
426 public void channelActive(ChannelHandlerContext ctx) throws Exception {
427 if (byteDecoder == null) {
428 byteDecoder = new PrefaceDecoder(ctx);
429 }
430 byteDecoder.channelActive(ctx);
431 super.channelActive(ctx);
432 }
433
434 @Override
435 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
436
437 super.channelInactive(ctx);
438 if (byteDecoder != null) {
439 byteDecoder.channelInactive(ctx);
440 byteDecoder = null;
441 }
442 }
443
444 @Override
445 public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
446
447
448 try {
449 if (ctx.channel().isWritable()) {
450 flush(ctx);
451 }
452 encoder.flowController().channelWritabilityChanged();
453 } finally {
454 super.channelWritabilityChanged(ctx);
455 }
456 }
457
458 @Override
459 protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
460 byteDecoder.decode(ctx, in, out);
461 }
462
463 @Override
464 public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
465 ctx.bind(localAddress, promise);
466 }
467
468 @Override
469 public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress,
470 ChannelPromise promise) throws Exception {
471 ctx.connect(remoteAddress, localAddress, promise);
472 }
473
474 @Override
475 public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
476 ctx.disconnect(promise);
477 }
478
479 @Override
480 public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
481 if (decoupleCloseAndGoAway) {
482 ctx.close(promise);
483 return;
484 }
485 promise = promise.unvoid();
486
487 if (!ctx.channel().isActive() || !prefaceSent()) {
488 ctx.close(promise);
489 return;
490 }
491
492
493
494
495
496
497 ChannelFuture f = connection().goAwaySent() ? ctx.write(EMPTY_BUFFER) : goAway(ctx, null, ctx.newPromise());
498 ctx.flush();
499 doGracefulShutdown(ctx, f, promise);
500 }
501
502 private ChannelFutureListener newClosingChannelFutureListener(
503 ChannelHandlerContext ctx, ChannelPromise promise) {
504 long gracefulShutdownTimeoutMillis = this.gracefulShutdownTimeoutMillis;
505 return gracefulShutdownTimeoutMillis < 0 ?
506 new ClosingChannelFutureListener(ctx, promise) :
507 new ClosingChannelFutureListener(ctx, promise, gracefulShutdownTimeoutMillis, MILLISECONDS);
508 }
509
510 private void doGracefulShutdown(ChannelHandlerContext ctx, ChannelFuture future, final ChannelPromise promise) {
511 final ChannelFutureListener listener = newClosingChannelFutureListener(ctx, promise);
512 if (isGracefulShutdownComplete()) {
513
514
515 future.addListener(listener);
516 } else {
517
518
519
520
521 if (closeListener == null) {
522 closeListener = listener;
523 } else if (promise != null) {
524 final ChannelFutureListener oldCloseListener = closeListener;
525 closeListener = future1 -> {
526 try {
527 oldCloseListener.operationComplete(future1);
528 } finally {
529 listener.operationComplete(future1);
530 }
531 };
532 }
533 }
534 }
535
536 @Override
537 public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
538 ctx.deregister(promise);
539 }
540
541 @Override
542 public void read(ChannelHandlerContext ctx) throws Exception {
543 ctx.read();
544 }
545
546 @Override
547 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
548 ctx.write(msg, promise);
549 }
550
551 @Override
552 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
553
554
555 try {
556
557 channelReadComplete0(ctx);
558 } finally {
559 flush(ctx);
560 }
561 }
562
563 final void channelReadComplete0(ChannelHandlerContext ctx) {
564
565 discardSomeReadBytes();
566
567
568
569
570 if (!ctx.channel().config().isAutoRead()) {
571 ctx.read();
572 }
573
574 ctx.fireChannelReadComplete();
575 }
576
577
578
579
580 @Override
581 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
582 if (getEmbeddedHttp2Exception(cause) != null) {
583
584 onError(ctx, false, cause);
585 } else {
586 super.exceptionCaught(ctx, cause);
587 }
588 }
589
590
591
592
593
594
595
596
597 @Override
598 public void closeStreamLocal(Http2Stream stream, ChannelFuture future) {
599 switch (stream.state()) {
600 case HALF_CLOSED_LOCAL:
601 case OPEN:
602 stream.closeLocalSide();
603 break;
604 default:
605 closeStream(stream, future);
606 break;
607 }
608 }
609
610
611
612
613
614
615
616
617 @Override
618 public void closeStreamRemote(Http2Stream stream, ChannelFuture future) {
619 switch (stream.state()) {
620 case HALF_CLOSED_REMOTE:
621 case OPEN:
622 stream.closeRemoteSide();
623 break;
624 default:
625 closeStream(stream, future);
626 break;
627 }
628 }
629
630 @Override
631 public void closeStream(final Http2Stream stream, ChannelFuture future) {
632 if (future.isDone()) {
633 doCloseStream(stream, future);
634 } else {
635 future.addListener((ChannelFutureListener) future1 -> doCloseStream(stream, future1));
636 }
637 }
638
639
640
641
642 @Override
643 public void onError(ChannelHandlerContext ctx, boolean outbound, Throwable cause) {
644 Http2Exception embedded = getEmbeddedHttp2Exception(cause);
645 if (isStreamError(embedded)) {
646 onStreamError(ctx, outbound, cause, (StreamException) embedded);
647 } else if (embedded instanceof CompositeStreamException) {
648 CompositeStreamException compositException = (CompositeStreamException) embedded;
649 for (StreamException streamException : compositException) {
650 onStreamError(ctx, outbound, cause, streamException);
651 }
652 } else {
653 onConnectionError(ctx, outbound, cause, embedded);
654 }
655 ctx.flush();
656 }
657
658
659
660
661
662
663 protected boolean isGracefulShutdownComplete() {
664 return connection().numActiveStreams() == 0;
665 }
666
667
668
669
670
671
672
673
674
675
676
677 protected void onConnectionError(ChannelHandlerContext ctx, boolean outbound,
678 Throwable cause, Http2Exception http2Ex) {
679 if (http2Ex == null) {
680 http2Ex = new Http2Exception(INTERNAL_ERROR, cause.getMessage(), cause);
681 }
682
683 ChannelPromise promise = ctx.newPromise();
684 ChannelFuture future = goAway(ctx, http2Ex, ctx.newPromise());
685 if (http2Ex.shutdownHint() == Http2Exception.ShutdownHint.GRACEFUL_SHUTDOWN) {
686 doGracefulShutdown(ctx, future, promise);
687 } else {
688 future.addListener(newClosingChannelFutureListener(ctx, promise));
689 }
690 }
691
692
693
694
695
696
697
698
699
700
701 protected void onStreamError(ChannelHandlerContext ctx, boolean outbound,
702 @SuppressWarnings("unused") Throwable cause, StreamException http2Ex) {
703 final int streamId = http2Ex.streamId();
704 Http2Stream stream = connection().stream(streamId);
705
706
707 if (http2Ex instanceof Http2Exception.HeaderListSizeException &&
708 ((Http2Exception.HeaderListSizeException) http2Ex).duringDecode() &&
709 connection().isServer()) {
710
711
712
713
714
715
716
717 if (stream == null) {
718 try {
719 stream = encoder.connection().remote().createStream(streamId, true);
720 } catch (Http2Exception e) {
721 encoder().writeRstStream(ctx, streamId, http2Ex.error().code(), ctx.newPromise());
722 return;
723 }
724 }
725
726
727 if (stream != null && !stream.isHeadersSent()) {
728 try {
729 handleServerHeaderDecodeSizeError(ctx, stream);
730 } catch (Throwable cause2) {
731 onError(ctx, outbound, connectionError(INTERNAL_ERROR, cause2, "Error DecodeSizeError"));
732 }
733 }
734 }
735
736 if (stream == null) {
737 if (!outbound || connection().local().mayHaveCreatedStream(streamId)) {
738 encoder().writeRstStream(ctx, streamId, http2Ex.error().code(), ctx.newPromise());
739 }
740 } else {
741 encoder().writeRstStream(ctx, streamId, http2Ex.error().code(), ctx.newPromise());
742 }
743 }
744
745
746
747
748
749
750
751
752 protected void handleServerHeaderDecodeSizeError(ChannelHandlerContext ctx, Http2Stream stream) {
753 encoder().writeHeaders(ctx, stream.id(), HEADERS_TOO_LARGE_HEADERS, 0, true, ctx.newPromise());
754 }
755
756 protected Http2FrameWriter frameWriter() {
757 return encoder().frameWriter();
758 }
759
760
761
762
763
764
765 private ChannelFuture resetUnknownStream(final ChannelHandlerContext ctx, int streamId, long errorCode,
766 ChannelPromise promise) {
767 ChannelFuture future = frameWriter().writeRstStream(ctx, streamId, errorCode, promise);
768 if (future.isDone()) {
769 closeConnectionOnError(ctx, future);
770 } else {
771 future.addListener((ChannelFutureListener) f -> closeConnectionOnError(ctx, f));
772 }
773 return future;
774 }
775
776 @Override
777 public ChannelFuture resetStream(final ChannelHandlerContext ctx, int streamId, long errorCode,
778 ChannelPromise promise) {
779 final Http2Stream stream = connection().stream(streamId);
780 if (stream == null) {
781 return resetUnknownStream(ctx, streamId, errorCode, promise.unvoid());
782 }
783
784 return resetStream(ctx, stream, errorCode, promise);
785 }
786
787 private ChannelFuture resetStream(final ChannelHandlerContext ctx, final Http2Stream stream,
788 long errorCode, ChannelPromise promise) {
789 promise = promise.unvoid();
790 if (stream.isResetSent()) {
791
792 return promise.setSuccess();
793 }
794
795
796
797
798
799 stream.resetSent();
800
801 final ChannelFuture future;
802
803
804 if (stream.state() == IDLE ||
805 connection().local().created(stream) && !stream.isHeadersSent() && !stream.isPushPromiseSent()) {
806 future = promise.setSuccess();
807 } else {
808 future = frameWriter().writeRstStream(ctx, stream.id(), errorCode, promise);
809 }
810 if (future.isDone()) {
811 processRstStreamWriteResult(ctx, stream, future);
812 } else {
813 future.addListener((ChannelFutureListener) f -> processRstStreamWriteResult(ctx, stream, f));
814 }
815
816 return future;
817 }
818
819 @Override
820 public ChannelFuture goAway(final ChannelHandlerContext ctx, final int lastStreamId, final long errorCode,
821 final ByteBuf debugData, ChannelPromise promise) {
822 promise = promise.unvoid();
823 final Http2Connection connection = connection();
824 try {
825 if (!connection.goAwaySent(lastStreamId, errorCode, debugData)) {
826 debugData.release();
827 promise.trySuccess();
828 return promise;
829 }
830 } catch (Throwable cause) {
831 debugData.release();
832 promise.tryFailure(cause);
833 return promise;
834 }
835
836
837
838 debugData.retain();
839 ChannelFuture future = frameWriter().writeGoAway(ctx, lastStreamId, errorCode, debugData, promise);
840
841 if (future.isDone()) {
842 processGoAwayWriteResult(ctx, lastStreamId, errorCode, debugData, future);
843 } else {
844 future.addListener((ChannelFutureListener) f ->
845 processGoAwayWriteResult(ctx, lastStreamId, errorCode, debugData, f));
846 }
847
848 return future;
849 }
850
851
852
853
854
855 private void checkCloseConnection(ChannelFuture future) {
856
857
858 if (closeListener != null && isGracefulShutdownComplete()) {
859 ChannelFutureListener closeListener = this.closeListener;
860
861
862 this.closeListener = null;
863 try {
864 closeListener.operationComplete(future);
865 } catch (Exception e) {
866 throw new IllegalStateException("Close listener threw an unexpected exception", e);
867 }
868 }
869 }
870
871
872
873
874
875 private ChannelFuture goAway(ChannelHandlerContext ctx, Http2Exception cause, ChannelPromise promise) {
876 long errorCode = cause != null ? cause.error().code() : NO_ERROR.code();
877 int lastKnownStream;
878 if (cause != null && cause.shutdownHint() == Http2Exception.ShutdownHint.HARD_SHUTDOWN) {
879
880
881
882
883 lastKnownStream = Integer.MAX_VALUE;
884 } else {
885 lastKnownStream = connection().remote().lastStreamCreated();
886 }
887 return goAway(ctx, lastKnownStream, errorCode, Http2CodecUtil.toByteBuf(ctx, cause), promise);
888 }
889
890 private void processRstStreamWriteResult(ChannelHandlerContext ctx, Http2Stream stream, ChannelFuture future) {
891 if (future.isSuccess()) {
892 closeStream(stream, future);
893 } else {
894
895 onConnectionError(ctx, true, future.cause(), null);
896 }
897 }
898
899 private void closeConnectionOnError(ChannelHandlerContext ctx, ChannelFuture future) {
900 if (!future.isSuccess()) {
901 onConnectionError(ctx, true, future.cause(), null);
902 }
903 }
904
905 private void doCloseStream(final Http2Stream stream, ChannelFuture future) {
906 stream.close();
907 checkCloseConnection(future);
908 }
909
910
911
912
913 private static ByteBuf clientPrefaceString(Http2Connection connection) {
914 return connection.isServer() ? connectionPrefaceBuf() : null;
915 }
916
917 private static void processGoAwayWriteResult(final ChannelHandlerContext ctx, final int lastStreamId,
918 final long errorCode, final ByteBuf debugData, ChannelFuture future) {
919 try {
920 if (future.isSuccess()) {
921 if (errorCode != NO_ERROR.code()) {
922 if (logger.isDebugEnabled()) {
923 logger.debug("{} Sent GOAWAY: lastStreamId '{}', errorCode '{}', " +
924 "debugData '{}'. Forcing shutdown of the connection.",
925 ctx.channel(), lastStreamId, errorCode, debugData.toString(UTF_8), future.cause());
926 }
927 ctx.close();
928 }
929 } else {
930 if (logger.isDebugEnabled()) {
931 logger.debug("{} Sending GOAWAY failed: lastStreamId '{}', errorCode '{}', " +
932 "debugData '{}'. Forcing shutdown of the connection.",
933 ctx.channel(), lastStreamId, errorCode, debugData.toString(UTF_8), future.cause());
934 }
935 ctx.close();
936 }
937 } finally {
938
939 debugData.release();
940 }
941 }
942
943
944
945
946 private static final class ClosingChannelFutureListener implements ChannelFutureListener {
947 private final ChannelHandlerContext ctx;
948 private final ChannelPromise promise;
949 private final Future<?> timeoutTask;
950 private boolean closed;
951
952 ClosingChannelFutureListener(ChannelHandlerContext ctx, ChannelPromise promise) {
953 this.ctx = ctx;
954 this.promise = promise;
955 timeoutTask = null;
956 }
957
958 ClosingChannelFutureListener(final ChannelHandlerContext ctx, final ChannelPromise promise,
959 long timeout, TimeUnit unit) {
960 this.ctx = ctx;
961 this.promise = promise;
962 timeoutTask = ctx.executor().schedule(new Runnable() {
963 @Override
964 public void run() {
965 doClose();
966 }
967 }, timeout, unit);
968 }
969
970 @Override
971 public void operationComplete(ChannelFuture sentGoAwayFuture) {
972 if (timeoutTask != null) {
973 timeoutTask.cancel(false);
974 }
975 doClose();
976 }
977
978 private void doClose() {
979
980
981 if (closed) {
982
983 assert timeoutTask != null;
984 return;
985 }
986 closed = true;
987 if (promise == null) {
988 ctx.close();
989 } else {
990 ctx.close(promise);
991 }
992 }
993 }
994 }