1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package io.netty.handler.codec.http2;
16
17 import io.netty.buffer.ByteBuf;
18 import io.netty.buffer.ByteBufUtil;
19 import io.netty.buffer.Unpooled;
20 import io.netty.channel.ChannelFuture;
21 import io.netty.channel.ChannelFutureListener;
22 import io.netty.channel.ChannelHandlerContext;
23 import io.netty.channel.ChannelOutboundHandler;
24 import io.netty.channel.ChannelPromise;
25 import io.netty.handler.codec.ByteToMessageDecoder;
26 import io.netty.handler.codec.http.HttpResponseStatus;
27 import io.netty.handler.codec.http2.Http2Exception.CompositeStreamException;
28 import io.netty.handler.codec.http2.Http2Exception.StreamException;
29 import io.netty.util.CharsetUtil;
30 import io.netty.util.concurrent.Future;
31 import io.netty.util.internal.logging.InternalLogger;
32 import io.netty.util.internal.logging.InternalLoggerFactory;
33
34 import java.net.SocketAddress;
35 import java.util.List;
36 import java.util.concurrent.TimeUnit;
37
38 import static io.netty.buffer.ByteBufUtil.hexDump;
39 import static io.netty.buffer.Unpooled.EMPTY_BUFFER;
40 import static io.netty.handler.codec.http2.Http2CodecUtil.HTTP_UPGRADE_STREAM_ID;
41 import static io.netty.handler.codec.http2.Http2CodecUtil.connectionPrefaceBuf;
42 import static io.netty.handler.codec.http2.Http2CodecUtil.getEmbeddedHttp2Exception;
43 import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
44 import static io.netty.handler.codec.http2.Http2Error.NO_ERROR;
45 import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
46 import static io.netty.handler.codec.http2.Http2Exception.connectionError;
47 import static io.netty.handler.codec.http2.Http2Exception.isStreamError;
48 import static io.netty.handler.codec.http2.Http2FrameTypes.SETTINGS;
49 import static io.netty.handler.codec.http2.Http2Stream.State.IDLE;
50 import static io.netty.util.CharsetUtil.UTF_8;
51 import static io.netty.util.internal.ObjectUtil.checkNotNull;
52 import static java.lang.Math.min;
53 import static java.util.concurrent.TimeUnit.MILLISECONDS;
54
55
56
57
58
59
60
61
62
63
64 public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http2LifecycleManager,
65 ChannelOutboundHandler {
66
67 private static final InternalLogger logger = InternalLoggerFactory.getInstance(Http2ConnectionHandler.class);
68
69 private static final Http2Headers HEADERS_TOO_LARGE_HEADERS = ReadOnlyHttp2Headers.serverHeaders(false,
70 HttpResponseStatus.REQUEST_HEADER_FIELDS_TOO_LARGE.codeAsText());
71 private static final ByteBuf HTTP_1_X_BUF = Unpooled.unreleasableBuffer(
72 Unpooled.wrappedBuffer(new byte[] {'H', 'T', 'T', 'P', '/', '1', '.'})).asReadOnly();
73
74 private final Http2ConnectionDecoder decoder;
75 private final Http2ConnectionEncoder encoder;
76 private final Http2Settings initialSettings;
77 private final boolean decoupleCloseAndGoAway;
78 private final boolean flushPreface;
79 private ChannelFutureListener closeListener;
80 private BaseDecoder byteDecoder;
81 private long gracefulShutdownTimeoutMillis;
82
83 protected Http2ConnectionHandler(Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder,
84 Http2Settings initialSettings) {
85 this(decoder, encoder, initialSettings, false);
86 }
87
88 protected Http2ConnectionHandler(Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder,
89 Http2Settings initialSettings, boolean decoupleCloseAndGoAway) {
90 this(decoder, encoder, initialSettings, decoupleCloseAndGoAway, true);
91 }
92
93 protected Http2ConnectionHandler(Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder,
94 Http2Settings initialSettings, boolean decoupleCloseAndGoAway,
95 boolean flushPreface) {
96 this.initialSettings = checkNotNull(initialSettings, "initialSettings");
97 this.decoder = checkNotNull(decoder, "decoder");
98 this.encoder = checkNotNull(encoder, "encoder");
99 this.decoupleCloseAndGoAway = decoupleCloseAndGoAway;
100 this.flushPreface = flushPreface;
101 if (encoder.connection() != decoder.connection()) {
102 throw new IllegalArgumentException("Encoder and Decoder do not share the same connection object");
103 }
104 }
105
106
107
108
109
110
111 public long gracefulShutdownTimeoutMillis() {
112 return gracefulShutdownTimeoutMillis;
113 }
114
115
116
117
118
119
120
121 public void gracefulShutdownTimeoutMillis(long gracefulShutdownTimeoutMillis) {
122 if (gracefulShutdownTimeoutMillis < -1) {
123 throw new IllegalArgumentException("gracefulShutdownTimeoutMillis: " + gracefulShutdownTimeoutMillis +
124 " (expected: -1 for indefinite or >= 0)");
125 }
126 this.gracefulShutdownTimeoutMillis = gracefulShutdownTimeoutMillis;
127 }
128
129 public Http2Connection connection() {
130 return encoder.connection();
131 }
132
133 public Http2ConnectionDecoder decoder() {
134 return decoder;
135 }
136
137 public Http2ConnectionEncoder encoder() {
138 return encoder;
139 }
140
141 private boolean prefaceSent() {
142 return byteDecoder != null && byteDecoder.prefaceSent();
143 }
144
145
146
147
148
149 public void onHttpClientUpgrade() throws Http2Exception {
150 if (connection().isServer()) {
151 throw connectionError(PROTOCOL_ERROR, "Client-side HTTP upgrade requested for a server");
152 }
153 if (!prefaceSent()) {
154
155
156 throw connectionError(INTERNAL_ERROR, "HTTP upgrade must occur after preface was sent");
157 }
158 if (decoder.prefaceReceived()) {
159 throw connectionError(PROTOCOL_ERROR, "HTTP upgrade must occur before HTTP/2 preface is received");
160 }
161
162
163 connection().local().createStream(HTTP_UPGRADE_STREAM_ID, true);
164 }
165
166
167
168
169
170 public void onHttpServerUpgrade(Http2Settings settings) throws Http2Exception {
171 if (!connection().isServer()) {
172 throw connectionError(PROTOCOL_ERROR, "Server-side HTTP upgrade requested for a client");
173 }
174 if (!prefaceSent()) {
175
176
177 throw connectionError(INTERNAL_ERROR, "HTTP upgrade must occur after preface was sent");
178 }
179 if (decoder.prefaceReceived()) {
180 throw connectionError(PROTOCOL_ERROR, "HTTP upgrade must occur before HTTP/2 preface is received");
181 }
182
183
184 encoder.remoteSettings(settings);
185
186
187 connection().remote().createStream(HTTP_UPGRADE_STREAM_ID, true);
188 }
189
190 @Override
191 public void flush(ChannelHandlerContext ctx) {
192 try {
193
194 encoder.flowController().writePendingBytes();
195 ctx.flush();
196 } catch (Http2Exception e) {
197 onError(ctx, true, e);
198 } catch (Throwable cause) {
199 onError(ctx, true, connectionError(INTERNAL_ERROR, cause, "Error flushing"));
200 }
201 }
202
203 private abstract class BaseDecoder {
204 public abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;
205 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { }
206 public void channelActive(ChannelHandlerContext ctx) throws Exception { }
207
208 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
209
210 encoder().close();
211 decoder().close();
212
213
214
215 connection().close(ctx.voidPromise());
216 }
217
218
219
220
221 public boolean prefaceSent() {
222 return true;
223 }
224
225
226
227
228
229
230
231 public void sendPrefaceIfNeeded(ChannelHandlerContext ctx) throws Exception {
232
233 }
234 }
235
236 private final class PrefaceDecoder extends BaseDecoder {
237 private ByteBuf clientPrefaceString;
238 private boolean prefaceSent;
239
240 PrefaceDecoder(ChannelHandlerContext ctx) throws Exception {
241 clientPrefaceString = clientPrefaceString(encoder.connection());
242
243
244 sendPrefaceIfNeeded(ctx);
245 }
246
247 @Override
248 public boolean prefaceSent() {
249 return prefaceSent;
250 }
251
252 @Override
253 public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
254 try {
255 if (ctx.channel().isActive() && readClientPrefaceString(in) && verifyFirstFrameIsSettings(in)) {
256
257 byteDecoder = new FrameDecoder();
258 byteDecoder.decode(ctx, in, out);
259 }
260 } catch (Throwable e) {
261 if (byteDecoder != null) {
262
263 in.skipBytes(in.readableBytes());
264 }
265 onError(ctx, false, e);
266 }
267 }
268
269 @Override
270 public void channelActive(ChannelHandlerContext ctx) throws Exception {
271
272 sendPrefaceIfNeeded(ctx);
273 }
274
275 @Override
276 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
277 cleanup();
278 super.channelInactive(ctx);
279 }
280
281
282
283
284 @Override
285 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
286 cleanup();
287 }
288
289
290
291
292 private void cleanup() {
293 if (clientPrefaceString != null) {
294 clientPrefaceString.release();
295 clientPrefaceString = null;
296 }
297 }
298
299
300
301
302
303
304
305 private boolean readClientPrefaceString(ByteBuf in) throws Http2Exception {
306 if (clientPrefaceString == null) {
307 return true;
308 }
309
310 int prefaceRemaining = clientPrefaceString.readableBytes();
311 int bytesRead = min(in.readableBytes(), prefaceRemaining);
312
313
314 if (bytesRead == 0 || !ByteBufUtil.equals(in, in.readerIndex(),
315 clientPrefaceString, clientPrefaceString.readerIndex(),
316 bytesRead)) {
317 int maxSearch = 1024;
318 int http1Index =
319 ByteBufUtil.indexOf(HTTP_1_X_BUF, in.slice(in.readerIndex(), min(in.readableBytes(), maxSearch)));
320 if (http1Index != -1) {
321 String chunk = in.toString(in.readerIndex(), http1Index - in.readerIndex(), CharsetUtil.US_ASCII);
322 throw connectionError(PROTOCOL_ERROR, "Unexpected HTTP/1.x request: %s", chunk);
323 }
324 String receivedBytes = hexDump(in, in.readerIndex(),
325 min(in.readableBytes(), clientPrefaceString.readableBytes()));
326 throw connectionError(PROTOCOL_ERROR, "HTTP/2 client preface string missing or corrupt. " +
327 "Hex dump for received bytes: %s", receivedBytes);
328 }
329 in.skipBytes(bytesRead);
330 clientPrefaceString.skipBytes(bytesRead);
331
332 if (!clientPrefaceString.isReadable()) {
333
334 clientPrefaceString.release();
335 clientPrefaceString = null;
336 return true;
337 }
338 return false;
339 }
340
341
342
343
344
345
346
347
348
349 private boolean verifyFirstFrameIsSettings(ByteBuf in) throws Http2Exception {
350 if (in.readableBytes() < 5) {
351
352 return false;
353 }
354
355 short frameType = in.getUnsignedByte(in.readerIndex() + 3);
356 if (frameType != SETTINGS) {
357 throw connectionError(PROTOCOL_ERROR, "First received frame was not SETTINGS. " +
358 "Hex dump for first 5 bytes: %s",
359 hexDump(in, in.readerIndex(), 5));
360 }
361 short flags = in.getUnsignedByte(in.readerIndex() + 4);
362 if ((flags & Http2Flags.ACK) != 0) {
363 throw connectionError(PROTOCOL_ERROR, "First received frame was SETTINGS frame but had ACK flag set. " +
364 "Hex dump for first 5 bytes: %s",
365 hexDump(in, in.readerIndex(), 5));
366 }
367 return true;
368 }
369
370
371
372
373 @Override
374 public void sendPrefaceIfNeeded(ChannelHandlerContext ctx) throws Exception {
375 if (prefaceSent || !ctx.channel().isActive()) {
376 return;
377 }
378
379 prefaceSent = true;
380
381 final boolean isClient = !connection().isServer();
382 if (isClient) {
383
384 ctx.write(connectionPrefaceBuf()).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
385 }
386
387
388 encoder.writeSettings(ctx, initialSettings, ctx.newPromise()).addListener(
389 ChannelFutureListener.CLOSE_ON_FAILURE);
390
391 try {
392 if (isClient) {
393
394
395
396 userEventTriggered(ctx, Http2ConnectionPrefaceAndSettingsFrameWrittenEvent.INSTANCE);
397 }
398 } finally {
399 if (flushPreface) {
400
401
402
403 ctx.flush();
404 }
405 }
406 }
407 }
408
409 private final class FrameDecoder extends BaseDecoder {
410 @Override
411 public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
412 try {
413 decoder.decodeFrame(ctx, in, out);
414 } catch (Throwable e) {
415 onError(ctx, false, e);
416 }
417 }
418 }
419
420 @Override
421 public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
422
423 encoder.lifecycleManager(this);
424 decoder.lifecycleManager(this);
425 encoder.flowController().channelHandlerContext(ctx);
426 decoder.flowController().channelHandlerContext(ctx);
427 byteDecoder = new PrefaceDecoder(ctx);
428 }
429
430 @Override
431 protected void handlerRemoved0(ChannelHandlerContext ctx) throws Exception {
432 if (byteDecoder != null) {
433 byteDecoder.handlerRemoved(ctx);
434 byteDecoder = null;
435 }
436 }
437
438 @Override
439 public void channelActive(ChannelHandlerContext ctx) throws Exception {
440 if (byteDecoder == null) {
441 byteDecoder = new PrefaceDecoder(ctx);
442 }
443 byteDecoder.channelActive(ctx);
444 super.channelActive(ctx);
445 }
446
447 @Override
448 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
449
450 super.channelInactive(ctx);
451 if (byteDecoder != null) {
452 byteDecoder.channelInactive(ctx);
453 byteDecoder = null;
454 }
455 }
456
457 @Override
458 public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
459
460
461 try {
462 if (ctx.channel().isWritable()) {
463 flush(ctx);
464 }
465 encoder.flowController().channelWritabilityChanged();
466 } finally {
467 super.channelWritabilityChanged(ctx);
468 }
469 }
470
471 @Override
472 protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
473 byteDecoder.decode(ctx, in, out);
474 }
475
476 @Override
477 public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
478
479
480
481 ctx.bind(localAddress, ctx.newPromise()).addListener(new PrefaceSendListener(ctx, promise));
482 }
483
484 @Override
485 public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress,
486 ChannelPromise promise) throws Exception {
487
488
489
490 ctx.connect(remoteAddress, localAddress, ctx.newPromise()).addListener(new PrefaceSendListener(ctx, promise));
491 }
492
493 @Override
494 public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
495 ctx.disconnect(promise);
496 }
497
498 @Override
499 public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
500 if (decoupleCloseAndGoAway) {
501 ctx.close(promise);
502 return;
503 }
504 promise = promise.unvoid();
505
506 if (!ctx.channel().isActive() || !prefaceSent()) {
507 ctx.close(promise);
508 return;
509 }
510
511
512
513
514
515
516 ChannelFuture f = connection().goAwaySent() ? ctx.write(EMPTY_BUFFER) : goAway(ctx, null, ctx.newPromise());
517 ctx.flush();
518 doGracefulShutdown(ctx, f, promise);
519 }
520
521 private ChannelFutureListener newClosingChannelFutureListener(
522 ChannelHandlerContext ctx, ChannelPromise promise) {
523 long gracefulShutdownTimeoutMillis = this.gracefulShutdownTimeoutMillis;
524 return gracefulShutdownTimeoutMillis < 0 ?
525 new ClosingChannelFutureListener(ctx, promise) :
526 new ClosingChannelFutureListener(ctx, promise, gracefulShutdownTimeoutMillis, MILLISECONDS);
527 }
528
529 private void doGracefulShutdown(ChannelHandlerContext ctx, ChannelFuture future, final ChannelPromise promise) {
530 final ChannelFutureListener listener = newClosingChannelFutureListener(ctx, promise);
531 if (isGracefulShutdownComplete()) {
532
533
534 future.addListener(listener);
535 } else {
536
537
538
539
540 if (closeListener == null) {
541 closeListener = listener;
542 } else if (promise != null) {
543 final ChannelFutureListener oldCloseListener = closeListener;
544 closeListener = future1 -> {
545 try {
546 oldCloseListener.operationComplete(future1);
547 } finally {
548 listener.operationComplete(future1);
549 }
550 };
551 }
552 }
553 }
554
555 @Override
556 public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
557 ctx.deregister(promise);
558 }
559
560 @Override
561 public void read(ChannelHandlerContext ctx) throws Exception {
562 ctx.read();
563 }
564
565 @Override
566 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
567 ctx.write(msg, promise);
568 }
569
570 @Override
571 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
572
573
574 try {
575
576 channelReadComplete0(ctx);
577 } finally {
578 flush(ctx);
579 }
580 }
581
582 final void channelReadComplete0(ChannelHandlerContext ctx) {
583
584 discardSomeReadBytes();
585
586
587
588
589 if (!ctx.channel().config().isAutoRead()) {
590 ctx.read();
591 }
592
593 ctx.fireChannelReadComplete();
594 }
595
596
597
598
599 @Override
600 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
601 if (getEmbeddedHttp2Exception(cause) != null) {
602
603 onError(ctx, false, cause);
604 } else {
605 super.exceptionCaught(ctx, cause);
606 }
607 }
608
609
610
611
612
613
614
615
616 @Override
617 public void closeStreamLocal(Http2Stream stream, ChannelFuture future) {
618 switch (stream.state()) {
619 case HALF_CLOSED_LOCAL:
620 case OPEN:
621 stream.closeLocalSide();
622 break;
623 default:
624 closeStream(stream, future);
625 break;
626 }
627 }
628
629
630
631
632
633
634
635
636 @Override
637 public void closeStreamRemote(Http2Stream stream, ChannelFuture future) {
638 switch (stream.state()) {
639 case HALF_CLOSED_REMOTE:
640 case OPEN:
641 stream.closeRemoteSide();
642 break;
643 default:
644 closeStream(stream, future);
645 break;
646 }
647 }
648
649 @Override
650 public void closeStream(final Http2Stream stream, ChannelFuture future) {
651 if (future.isDone()) {
652 doCloseStream(stream, future);
653 } else {
654 future.addListener((ChannelFutureListener) future1 -> doCloseStream(stream, future1));
655 }
656 }
657
658
659
660
661 @Override
662 public void onError(ChannelHandlerContext ctx, boolean outbound, Throwable cause) {
663 Http2Exception embedded = getEmbeddedHttp2Exception(cause);
664 if (isStreamError(embedded)) {
665 onStreamError(ctx, outbound, cause, (StreamException) embedded);
666 } else if (embedded instanceof CompositeStreamException) {
667 CompositeStreamException compositException = (CompositeStreamException) embedded;
668 for (StreamException streamException : compositException) {
669 onStreamError(ctx, outbound, cause, streamException);
670 }
671 } else {
672 onConnectionError(ctx, outbound, cause, embedded);
673 }
674 ctx.flush();
675 }
676
677
678
679
680
681
682 protected boolean isGracefulShutdownComplete() {
683 return connection().numActiveStreams() == 0;
684 }
685
686
687
688
689
690
691
692
693
694
695
696 protected void onConnectionError(ChannelHandlerContext ctx, boolean outbound,
697 Throwable cause, Http2Exception http2Ex) {
698 if (http2Ex == null) {
699 http2Ex = new Http2Exception(INTERNAL_ERROR, cause.getMessage(), cause);
700 }
701
702 ChannelPromise promise = ctx.newPromise();
703 ChannelFuture future = goAway(ctx, http2Ex, ctx.newPromise());
704 if (http2Ex.shutdownHint() == Http2Exception.ShutdownHint.GRACEFUL_SHUTDOWN) {
705 doGracefulShutdown(ctx, future, promise);
706 } else {
707 future.addListener(newClosingChannelFutureListener(ctx, promise));
708 }
709 }
710
711
712
713
714
715
716
717
718
719
720 protected void onStreamError(ChannelHandlerContext ctx, boolean outbound,
721 @SuppressWarnings("unused") Throwable cause, StreamException http2Ex) {
722 final int streamId = http2Ex.streamId();
723 Http2Stream stream = connection().stream(streamId);
724
725
726 if (http2Ex instanceof Http2Exception.HeaderListSizeException &&
727 ((Http2Exception.HeaderListSizeException) http2Ex).duringDecode() &&
728 connection().isServer()) {
729
730
731
732
733
734
735
736 if (stream == null) {
737 try {
738 stream = encoder.connection().remote().createStream(streamId, true);
739 } catch (Http2Exception e) {
740 encoder().writeRstStream(ctx, streamId, http2Ex.error().code(), ctx.newPromise());
741 return;
742 }
743 }
744
745
746 if (stream != null && !stream.isHeadersSent()) {
747 try {
748 handleServerHeaderDecodeSizeError(ctx, stream);
749 } catch (Throwable cause2) {
750 onError(ctx, outbound, connectionError(INTERNAL_ERROR, cause2, "Error DecodeSizeError"));
751 }
752 }
753 }
754
755 if (stream == null) {
756 if (!outbound || connection().local().mayHaveCreatedStream(streamId)) {
757 encoder().writeRstStream(ctx, streamId, http2Ex.error().code(), ctx.newPromise());
758 }
759 } else {
760 encoder().writeRstStream(ctx, streamId, http2Ex.error().code(), ctx.newPromise());
761 }
762 }
763
764
765
766
767
768
769
770
771 protected void handleServerHeaderDecodeSizeError(ChannelHandlerContext ctx, Http2Stream stream) {
772 encoder().writeHeaders(ctx, stream.id(), HEADERS_TOO_LARGE_HEADERS, 0, true, ctx.newPromise());
773 }
774
775 protected Http2FrameWriter frameWriter() {
776 return encoder().frameWriter();
777 }
778
779
780
781
782
783
784 private ChannelFuture resetUnknownStream(final ChannelHandlerContext ctx, int streamId, long errorCode,
785 ChannelPromise promise) {
786 ChannelFuture future = frameWriter().writeRstStream(ctx, streamId, errorCode, promise);
787 if (future.isDone()) {
788 closeConnectionOnError(ctx, future);
789 } else {
790 future.addListener((ChannelFutureListener) f -> closeConnectionOnError(ctx, f));
791 }
792 return future;
793 }
794
795 @Override
796 public ChannelFuture resetStream(final ChannelHandlerContext ctx, int streamId, long errorCode,
797 ChannelPromise promise) {
798 final Http2Stream stream = connection().stream(streamId);
799 if (stream == null) {
800 return resetUnknownStream(ctx, streamId, errorCode, promise.unvoid());
801 }
802
803 return resetStream(ctx, stream, errorCode, promise);
804 }
805
806 private ChannelFuture resetStream(final ChannelHandlerContext ctx, final Http2Stream stream,
807 long errorCode, ChannelPromise promise) {
808 promise = promise.unvoid();
809 if (stream.isResetSent()) {
810
811 return promise.setSuccess();
812 }
813
814
815
816
817
818 stream.resetSent();
819
820 final ChannelFuture future;
821
822
823 if (stream.state() == IDLE ||
824 connection().local().created(stream) && !stream.isHeadersSent() && !stream.isPushPromiseSent()) {
825 future = promise.setSuccess();
826 } else {
827 future = frameWriter().writeRstStream(ctx, stream.id(), errorCode, promise);
828 }
829 if (future.isDone()) {
830 processRstStreamWriteResult(ctx, stream, future);
831 } else {
832 future.addListener((ChannelFutureListener) f -> processRstStreamWriteResult(ctx, stream, f));
833 }
834
835 return future;
836 }
837
838 @Override
839 public ChannelFuture goAway(final ChannelHandlerContext ctx, final int lastStreamId, final long errorCode,
840 final ByteBuf debugData, ChannelPromise promise) {
841 promise = promise.unvoid();
842 final Http2Connection connection = connection();
843 try {
844 if (!connection.goAwaySent(lastStreamId, errorCode, debugData)) {
845 debugData.release();
846 promise.trySuccess();
847 return promise;
848 }
849 } catch (Throwable cause) {
850 debugData.release();
851 promise.tryFailure(cause);
852 return promise;
853 }
854
855
856
857 debugData.retain();
858 ChannelFuture future = frameWriter().writeGoAway(ctx, lastStreamId, errorCode, debugData, promise);
859
860 if (future.isDone()) {
861 processGoAwayWriteResult(ctx, lastStreamId, errorCode, debugData, future);
862 } else {
863 future.addListener((ChannelFutureListener) f ->
864 processGoAwayWriteResult(ctx, lastStreamId, errorCode, debugData, f));
865 }
866
867 return future;
868 }
869
870
871
872
873
874 private void checkCloseConnection(ChannelFuture future) {
875
876
877 if (closeListener != null && isGracefulShutdownComplete()) {
878 ChannelFutureListener closeListener = this.closeListener;
879
880
881 this.closeListener = null;
882 try {
883 closeListener.operationComplete(future);
884 } catch (Exception e) {
885 throw new IllegalStateException("Close listener threw an unexpected exception", e);
886 }
887 }
888 }
889
890
891
892
893
894 private ChannelFuture goAway(ChannelHandlerContext ctx, Http2Exception cause, ChannelPromise promise) {
895 long errorCode = cause != null ? cause.error().code() : NO_ERROR.code();
896 int lastKnownStream;
897 if (cause != null && cause.shutdownHint() == Http2Exception.ShutdownHint.HARD_SHUTDOWN) {
898
899
900
901
902 lastKnownStream = Integer.MAX_VALUE;
903 } else {
904 lastKnownStream = connection().remote().lastStreamCreated();
905 }
906 return goAway(ctx, lastKnownStream, errorCode, Http2CodecUtil.toByteBuf(ctx, cause), promise);
907 }
908
909 private void processRstStreamWriteResult(ChannelHandlerContext ctx, Http2Stream stream, ChannelFuture future) {
910 if (future.isSuccess()) {
911 closeStream(stream, future);
912 } else {
913
914 onConnectionError(ctx, true, future.cause(), null);
915 }
916 }
917
918 private void closeConnectionOnError(ChannelHandlerContext ctx, ChannelFuture future) {
919 if (!future.isSuccess()) {
920 onConnectionError(ctx, true, future.cause(), null);
921 }
922 }
923
924 private void doCloseStream(final Http2Stream stream, ChannelFuture future) {
925 stream.close();
926 checkCloseConnection(future);
927 }
928
929
930
931
932 private static ByteBuf clientPrefaceString(Http2Connection connection) {
933 return connection.isServer() ? connectionPrefaceBuf() : null;
934 }
935
936 private static void processGoAwayWriteResult(final ChannelHandlerContext ctx, final int lastStreamId,
937 final long errorCode, final ByteBuf debugData, ChannelFuture future) {
938 try {
939 if (future.isSuccess()) {
940 if (errorCode != NO_ERROR.code()) {
941 if (logger.isDebugEnabled()) {
942 logger.debug("{} Sent GOAWAY: lastStreamId '{}', errorCode '{}', " +
943 "debugData '{}'. Forcing shutdown of the connection.",
944 ctx.channel(), lastStreamId, errorCode, debugData.toString(UTF_8));
945 }
946 ctx.close();
947 }
948 } else {
949 if (logger.isDebugEnabled()) {
950 logger.debug("{} Sending GOAWAY failed: lastStreamId '{}', errorCode '{}', " +
951 "debugData '{}'. Forcing shutdown of the connection.",
952 ctx.channel(), lastStreamId, errorCode, debugData.toString(UTF_8), future.cause());
953 }
954 ctx.close();
955 }
956 } finally {
957
958 debugData.release();
959 }
960 }
961
962
963
964
965 private static final class ClosingChannelFutureListener implements ChannelFutureListener {
966 private final ChannelHandlerContext ctx;
967 private final ChannelPromise promise;
968 private final Future<?> timeoutTask;
969 private boolean closed;
970
971 ClosingChannelFutureListener(ChannelHandlerContext ctx, ChannelPromise promise) {
972 this.ctx = ctx;
973 this.promise = promise;
974 timeoutTask = null;
975 }
976
977 ClosingChannelFutureListener(final ChannelHandlerContext ctx, final ChannelPromise promise,
978 long timeout, TimeUnit unit) {
979 this.ctx = ctx;
980 this.promise = promise;
981 timeoutTask = ctx.executor().schedule(new Runnable() {
982 @Override
983 public void run() {
984 doClose();
985 }
986 }, timeout, unit);
987 }
988
989 @Override
990 public void operationComplete(ChannelFuture sentGoAwayFuture) {
991 if (timeoutTask != null) {
992 timeoutTask.cancel(false);
993 }
994 doClose();
995 }
996
997 private void doClose() {
998
999
1000 if (closed) {
1001
1002 assert timeoutTask != null;
1003 return;
1004 }
1005 closed = true;
1006 if (promise == null) {
1007 ctx.close();
1008 } else {
1009 ctx.close(promise);
1010 }
1011 }
1012 }
1013
1014 private final class PrefaceSendListener implements ChannelFutureListener {
1015 private final ChannelHandlerContext ctx;
1016 private final ChannelPromise promise;
1017
1018 PrefaceSendListener(ChannelHandlerContext ctx, ChannelPromise promise) {
1019 this.ctx = ctx;
1020 this.promise = promise;
1021 }
1022
1023 @Override
1024 public void operationComplete(ChannelFuture f) {
1025 if (f.isSuccess()) {
1026 try {
1027 if (byteDecoder != null) {
1028 byteDecoder.sendPrefaceIfNeeded(ctx);
1029 }
1030 } catch (Throwable e) {
1031 promise.setFailure(e);
1032 return;
1033 }
1034 promise.setSuccess();
1035 } else {
1036 promise.setFailure(f.cause());
1037 }
1038 }
1039 }
1040 }