1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package io.netty.handler.codec.http2;
16
17 import io.netty.buffer.ByteBuf;
18 import io.netty.buffer.ByteBufUtil;
19 import io.netty.buffer.Unpooled;
20 import io.netty.channel.ChannelFuture;
21 import io.netty.channel.ChannelFutureListener;
22 import io.netty.channel.ChannelHandlerContext;
23 import io.netty.channel.ChannelOutboundHandler;
24 import io.netty.channel.ChannelPromise;
25 import io.netty.handler.codec.ByteToMessageDecoder;
26 import io.netty.handler.codec.http.HttpResponseStatus;
27 import io.netty.handler.codec.http2.Http2Exception.CompositeStreamException;
28 import io.netty.handler.codec.http2.Http2Exception.StreamException;
29 import io.netty.util.CharsetUtil;
30 import io.netty.util.concurrent.Future;
31 import io.netty.util.internal.logging.InternalLogger;
32 import io.netty.util.internal.logging.InternalLoggerFactory;
33
34 import java.net.SocketAddress;
35 import java.util.List;
36 import java.util.concurrent.TimeUnit;
37
38 import static io.netty.buffer.ByteBufUtil.hexDump;
39 import static io.netty.buffer.Unpooled.EMPTY_BUFFER;
40 import static io.netty.handler.codec.http2.Http2CodecUtil.HTTP_UPGRADE_STREAM_ID;
41 import static io.netty.handler.codec.http2.Http2CodecUtil.connectionPrefaceBuf;
42 import static io.netty.handler.codec.http2.Http2CodecUtil.getEmbeddedHttp2Exception;
43 import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
44 import static io.netty.handler.codec.http2.Http2Error.NO_ERROR;
45 import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
46 import static io.netty.handler.codec.http2.Http2Exception.connectionError;
47 import static io.netty.handler.codec.http2.Http2Exception.isStreamError;
48 import static io.netty.handler.codec.http2.Http2FrameTypes.SETTINGS;
49 import static io.netty.handler.codec.http2.Http2Stream.State.IDLE;
50 import static io.netty.util.CharsetUtil.UTF_8;
51 import static io.netty.util.internal.ObjectUtil.checkNotNull;
52 import static java.lang.Math.min;
53 import static java.util.concurrent.TimeUnit.MILLISECONDS;
54
55
56
57
58
59
60
61
62
63
64 public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http2LifecycleManager,
65 ChannelOutboundHandler {
66
67 private static final InternalLogger logger = InternalLoggerFactory.getInstance(Http2ConnectionHandler.class);
68
69 private static final Http2Headers HEADERS_TOO_LARGE_HEADERS = ReadOnlyHttp2Headers.serverHeaders(false,
70 HttpResponseStatus.REQUEST_HEADER_FIELDS_TOO_LARGE.codeAsText());
71 private static final ByteBuf HTTP_1_X_BUF = Unpooled.unreleasableBuffer(
72 Unpooled.wrappedBuffer(new byte[] {'H', 'T', 'T', 'P', '/', '1', '.'})).asReadOnly();
73
74 private final Http2ConnectionDecoder decoder;
75 private final Http2ConnectionEncoder encoder;
76 private final Http2Settings initialSettings;
77 private final boolean decoupleCloseAndGoAway;
78 private final boolean flushPreface;
79 private ChannelFutureListener closeListener;
80 private BaseDecoder byteDecoder;
81 private long gracefulShutdownTimeoutMillis;
82
83 protected Http2ConnectionHandler(Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder,
84 Http2Settings initialSettings) {
85 this(decoder, encoder, initialSettings, false);
86 }
87
88 protected Http2ConnectionHandler(Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder,
89 Http2Settings initialSettings, boolean decoupleCloseAndGoAway) {
90 this(decoder, encoder, initialSettings, decoupleCloseAndGoAway, true);
91 }
92
93 protected Http2ConnectionHandler(Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder,
94 Http2Settings initialSettings, boolean decoupleCloseAndGoAway,
95 boolean flushPreface) {
96 this.initialSettings = checkNotNull(initialSettings, "initialSettings");
97 this.decoder = checkNotNull(decoder, "decoder");
98 this.encoder = checkNotNull(encoder, "encoder");
99 this.decoupleCloseAndGoAway = decoupleCloseAndGoAway;
100 this.flushPreface = flushPreface;
101 if (encoder.connection() != decoder.connection()) {
102 throw new IllegalArgumentException("Encoder and Decoder do not share the same connection object");
103 }
104 }
105
106
107
108
109
110
111 public long gracefulShutdownTimeoutMillis() {
112 return gracefulShutdownTimeoutMillis;
113 }
114
115
116
117
118
119
120
121 public void gracefulShutdownTimeoutMillis(long gracefulShutdownTimeoutMillis) {
122 if (gracefulShutdownTimeoutMillis < -1) {
123 throw new IllegalArgumentException("gracefulShutdownTimeoutMillis: " + gracefulShutdownTimeoutMillis +
124 " (expected: -1 for indefinite or >= 0)");
125 }
126 this.gracefulShutdownTimeoutMillis = gracefulShutdownTimeoutMillis;
127 }
128
129 public Http2Connection connection() {
130 return encoder.connection();
131 }
132
133 public Http2ConnectionDecoder decoder() {
134 return decoder;
135 }
136
137 public Http2ConnectionEncoder encoder() {
138 return encoder;
139 }
140
141 private boolean prefaceSent() {
142 return byteDecoder != null && byteDecoder.prefaceSent();
143 }
144
145
146
147
148
149 public void onHttpClientUpgrade() throws Http2Exception {
150 if (connection().isServer()) {
151 throw connectionError(PROTOCOL_ERROR, "Client-side HTTP upgrade requested for a server");
152 }
153 if (!prefaceSent()) {
154
155
156 throw connectionError(INTERNAL_ERROR, "HTTP upgrade must occur after preface was sent");
157 }
158 if (decoder.prefaceReceived()) {
159 throw connectionError(PROTOCOL_ERROR, "HTTP upgrade must occur before HTTP/2 preface is received");
160 }
161
162
163 connection().local().createStream(HTTP_UPGRADE_STREAM_ID, true);
164 }
165
166
167
168
169
170 public void onHttpServerUpgrade(Http2Settings settings) throws Http2Exception {
171 if (!connection().isServer()) {
172 throw connectionError(PROTOCOL_ERROR, "Server-side HTTP upgrade requested for a client");
173 }
174 if (!prefaceSent()) {
175
176
177 throw connectionError(INTERNAL_ERROR, "HTTP upgrade must occur after preface was sent");
178 }
179 if (decoder.prefaceReceived()) {
180 throw connectionError(PROTOCOL_ERROR, "HTTP upgrade must occur before HTTP/2 preface is received");
181 }
182
183
184 encoder.remoteSettings(settings);
185
186
187 connection().remote().createStream(HTTP_UPGRADE_STREAM_ID, true);
188 }
189
190 @Override
191 public void flush(ChannelHandlerContext ctx) {
192 try {
193
194 encoder.flowController().writePendingBytes();
195 ctx.flush();
196 } catch (Http2Exception e) {
197 onError(ctx, true, e);
198 } catch (Throwable cause) {
199 onError(ctx, true, connectionError(INTERNAL_ERROR, cause, "Error flushing"));
200 }
201 }
202
203 private abstract class BaseDecoder {
204 public abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;
205 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { }
206 public void channelActive(ChannelHandlerContext ctx) throws Exception { }
207
208 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
209
210 encoder().close();
211 decoder().close();
212
213
214
215 connection().close(ctx.voidPromise());
216 }
217
218
219
220
221 public boolean prefaceSent() {
222 return true;
223 }
224 }
225
226 private final class PrefaceDecoder extends BaseDecoder {
227 private ByteBuf clientPrefaceString;
228 private boolean prefaceSent;
229
230 PrefaceDecoder(ChannelHandlerContext ctx) throws Exception {
231 clientPrefaceString = clientPrefaceString(encoder.connection());
232
233
234 sendPreface(ctx);
235 }
236
237 @Override
238 public boolean prefaceSent() {
239 return prefaceSent;
240 }
241
242 @Override
243 public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
244 try {
245 if (ctx.channel().isActive() && readClientPrefaceString(in) && verifyFirstFrameIsSettings(in)) {
246
247 byteDecoder = new FrameDecoder();
248 byteDecoder.decode(ctx, in, out);
249 }
250 } catch (Throwable e) {
251 onError(ctx, false, e);
252 }
253 }
254
255 @Override
256 public void channelActive(ChannelHandlerContext ctx) throws Exception {
257
258 sendPreface(ctx);
259
260 if (flushPreface) {
261
262
263
264 ctx.flush();
265 }
266 }
267
268 @Override
269 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
270 cleanup();
271 super.channelInactive(ctx);
272 }
273
274
275
276
277 @Override
278 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
279 cleanup();
280 }
281
282
283
284
285 private void cleanup() {
286 if (clientPrefaceString != null) {
287 clientPrefaceString.release();
288 clientPrefaceString = null;
289 }
290 }
291
292
293
294
295
296
297
298 private boolean readClientPrefaceString(ByteBuf in) throws Http2Exception {
299 if (clientPrefaceString == null) {
300 return true;
301 }
302
303 int prefaceRemaining = clientPrefaceString.readableBytes();
304 int bytesRead = min(in.readableBytes(), prefaceRemaining);
305
306
307 if (bytesRead == 0 || !ByteBufUtil.equals(in, in.readerIndex(),
308 clientPrefaceString, clientPrefaceString.readerIndex(),
309 bytesRead)) {
310 int maxSearch = 1024;
311 int http1Index =
312 ByteBufUtil.indexOf(HTTP_1_X_BUF, in.slice(in.readerIndex(), min(in.readableBytes(), maxSearch)));
313 if (http1Index != -1) {
314 String chunk = in.toString(in.readerIndex(), http1Index - in.readerIndex(), CharsetUtil.US_ASCII);
315 throw connectionError(PROTOCOL_ERROR, "Unexpected HTTP/1.x request: %s", chunk);
316 }
317 String receivedBytes = hexDump(in, in.readerIndex(),
318 min(in.readableBytes(), clientPrefaceString.readableBytes()));
319 throw connectionError(PROTOCOL_ERROR, "HTTP/2 client preface string missing or corrupt. " +
320 "Hex dump for received bytes: %s", receivedBytes);
321 }
322 in.skipBytes(bytesRead);
323 clientPrefaceString.skipBytes(bytesRead);
324
325 if (!clientPrefaceString.isReadable()) {
326
327 clientPrefaceString.release();
328 clientPrefaceString = null;
329 return true;
330 }
331 return false;
332 }
333
334
335
336
337
338
339
340
341
342 private boolean verifyFirstFrameIsSettings(ByteBuf in) throws Http2Exception {
343 if (in.readableBytes() < 5) {
344
345 return false;
346 }
347
348 short frameType = in.getUnsignedByte(in.readerIndex() + 3);
349 short flags = in.getUnsignedByte(in.readerIndex() + 4);
350 if (frameType != SETTINGS || (flags & Http2Flags.ACK) != 0) {
351 throw connectionError(PROTOCOL_ERROR, "First received frame was not SETTINGS. " +
352 "Hex dump for first 5 bytes: %s",
353 hexDump(in, in.readerIndex(), 5));
354 }
355 return true;
356 }
357
358
359
360
361 private void sendPreface(ChannelHandlerContext ctx) throws Exception {
362 if (prefaceSent || !ctx.channel().isActive()) {
363 return;
364 }
365
366 prefaceSent = true;
367
368 final boolean isClient = !connection().isServer();
369 if (isClient) {
370
371 ctx.write(connectionPrefaceBuf()).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
372 }
373
374
375 encoder.writeSettings(ctx, initialSettings, ctx.newPromise()).addListener(
376 ChannelFutureListener.CLOSE_ON_FAILURE);
377
378 if (isClient) {
379
380
381
382 userEventTriggered(ctx, Http2ConnectionPrefaceAndSettingsFrameWrittenEvent.INSTANCE);
383 }
384 }
385 }
386
387 private final class FrameDecoder extends BaseDecoder {
388 @Override
389 public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
390 try {
391 decoder.decodeFrame(ctx, in, out);
392 } catch (Throwable e) {
393 onError(ctx, false, e);
394 }
395 }
396 }
397
398 @Override
399 public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
400
401 encoder.lifecycleManager(this);
402 decoder.lifecycleManager(this);
403 encoder.flowController().channelHandlerContext(ctx);
404 decoder.flowController().channelHandlerContext(ctx);
405 byteDecoder = new PrefaceDecoder(ctx);
406 }
407
408 @Override
409 protected void handlerRemoved0(ChannelHandlerContext ctx) throws Exception {
410 if (byteDecoder != null) {
411 byteDecoder.handlerRemoved(ctx);
412 byteDecoder = null;
413 }
414 }
415
416 @Override
417 public void channelActive(ChannelHandlerContext ctx) throws Exception {
418 if (byteDecoder == null) {
419 byteDecoder = new PrefaceDecoder(ctx);
420 }
421 byteDecoder.channelActive(ctx);
422 super.channelActive(ctx);
423 }
424
425 @Override
426 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
427
428 super.channelInactive(ctx);
429 if (byteDecoder != null) {
430 byteDecoder.channelInactive(ctx);
431 byteDecoder = null;
432 }
433 }
434
435 @Override
436 public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
437
438
439 try {
440 if (ctx.channel().isWritable()) {
441 flush(ctx);
442 }
443 encoder.flowController().channelWritabilityChanged();
444 } finally {
445 super.channelWritabilityChanged(ctx);
446 }
447 }
448
449 @Override
450 protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
451 byteDecoder.decode(ctx, in, out);
452 }
453
454 @Override
455 public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
456 ctx.bind(localAddress, promise);
457 }
458
459 @Override
460 public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress,
461 ChannelPromise promise) throws Exception {
462 ctx.connect(remoteAddress, localAddress, promise);
463 }
464
465 @Override
466 public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
467 ctx.disconnect(promise);
468 }
469
470 @Override
471 public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
472 if (decoupleCloseAndGoAway) {
473 ctx.close(promise);
474 return;
475 }
476 promise = promise.unvoid();
477
478 if (!ctx.channel().isActive() || !prefaceSent()) {
479 ctx.close(promise);
480 return;
481 }
482
483
484
485
486
487
488 ChannelFuture f = connection().goAwaySent() ? ctx.write(EMPTY_BUFFER) : goAway(ctx, null, ctx.newPromise());
489 ctx.flush();
490 doGracefulShutdown(ctx, f, promise);
491 }
492
493 private ChannelFutureListener newClosingChannelFutureListener(
494 ChannelHandlerContext ctx, ChannelPromise promise) {
495 long gracefulShutdownTimeoutMillis = this.gracefulShutdownTimeoutMillis;
496 return gracefulShutdownTimeoutMillis < 0 ?
497 new ClosingChannelFutureListener(ctx, promise) :
498 new ClosingChannelFutureListener(ctx, promise, gracefulShutdownTimeoutMillis, MILLISECONDS);
499 }
500
501 private void doGracefulShutdown(ChannelHandlerContext ctx, ChannelFuture future, final ChannelPromise promise) {
502 final ChannelFutureListener listener = newClosingChannelFutureListener(ctx, promise);
503 if (isGracefulShutdownComplete()) {
504
505
506 future.addListener(listener);
507 } else {
508
509
510
511
512 if (closeListener == null) {
513 closeListener = listener;
514 } else if (promise != null) {
515 final ChannelFutureListener oldCloseListener = closeListener;
516 closeListener = new ChannelFutureListener() {
517 @Override
518 public void operationComplete(ChannelFuture future) throws Exception {
519 try {
520 oldCloseListener.operationComplete(future);
521 } finally {
522 listener.operationComplete(future);
523 }
524 }
525 };
526 }
527 }
528 }
529
530 @Override
531 public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
532 ctx.deregister(promise);
533 }
534
535 @Override
536 public void read(ChannelHandlerContext ctx) throws Exception {
537 ctx.read();
538 }
539
540 @Override
541 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
542 ctx.write(msg, promise);
543 }
544
545 @Override
546 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
547
548
549 try {
550
551 channelReadComplete0(ctx);
552 } finally {
553 flush(ctx);
554 }
555 }
556
557 final void channelReadComplete0(ChannelHandlerContext ctx) {
558
559 discardSomeReadBytes();
560
561
562
563
564 if (!ctx.channel().config().isAutoRead()) {
565 ctx.read();
566 }
567
568 ctx.fireChannelReadComplete();
569 }
570
571
572
573
574 @Override
575 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
576 if (getEmbeddedHttp2Exception(cause) != null) {
577
578 onError(ctx, false, cause);
579 } else {
580 super.exceptionCaught(ctx, cause);
581 }
582 }
583
584
585
586
587
588
589
590
591 @Override
592 public void closeStreamLocal(Http2Stream stream, ChannelFuture future) {
593 switch (stream.state()) {
594 case HALF_CLOSED_LOCAL:
595 case OPEN:
596 stream.closeLocalSide();
597 break;
598 default:
599 closeStream(stream, future);
600 break;
601 }
602 }
603
604
605
606
607
608
609
610
611 @Override
612 public void closeStreamRemote(Http2Stream stream, ChannelFuture future) {
613 switch (stream.state()) {
614 case HALF_CLOSED_REMOTE:
615 case OPEN:
616 stream.closeRemoteSide();
617 break;
618 default:
619 closeStream(stream, future);
620 break;
621 }
622 }
623
624 @Override
625 public void closeStream(final Http2Stream stream, ChannelFuture future) {
626 if (future.isDone()) {
627 doCloseStream(stream, future);
628 } else {
629 future.addListener(new ChannelFutureListener() {
630 @Override
631 public void operationComplete(ChannelFuture future) {
632 doCloseStream(stream, future);
633 }
634 });
635 }
636 }
637
638
639
640
641 @Override
642 public void onError(ChannelHandlerContext ctx, boolean outbound, Throwable cause) {
643 Http2Exception embedded = getEmbeddedHttp2Exception(cause);
644 if (isStreamError(embedded)) {
645 onStreamError(ctx, outbound, cause, (StreamException) embedded);
646 } else if (embedded instanceof CompositeStreamException) {
647 CompositeStreamException compositException = (CompositeStreamException) embedded;
648 for (StreamException streamException : compositException) {
649 onStreamError(ctx, outbound, cause, streamException);
650 }
651 } else {
652 onConnectionError(ctx, outbound, cause, embedded);
653 }
654 ctx.flush();
655 }
656
657
658
659
660
661
662 protected boolean isGracefulShutdownComplete() {
663 return connection().numActiveStreams() == 0;
664 }
665
666
667
668
669
670
671
672
673
674
675
676 protected void onConnectionError(ChannelHandlerContext ctx, boolean outbound,
677 Throwable cause, Http2Exception http2Ex) {
678 if (http2Ex == null) {
679 http2Ex = new Http2Exception(INTERNAL_ERROR, cause.getMessage(), cause);
680 }
681
682 ChannelPromise promise = ctx.newPromise();
683 ChannelFuture future = goAway(ctx, http2Ex, ctx.newPromise());
684 if (http2Ex.shutdownHint() == Http2Exception.ShutdownHint.GRACEFUL_SHUTDOWN) {
685 doGracefulShutdown(ctx, future, promise);
686 } else {
687 future.addListener(newClosingChannelFutureListener(ctx, promise));
688 }
689 }
690
691
692
693
694
695
696
697
698
699
700 protected void onStreamError(ChannelHandlerContext ctx, boolean outbound,
701 @SuppressWarnings("unused") Throwable cause, StreamException http2Ex) {
702 final int streamId = http2Ex.streamId();
703 Http2Stream stream = connection().stream(streamId);
704
705
706 if (http2Ex instanceof Http2Exception.HeaderListSizeException &&
707 ((Http2Exception.HeaderListSizeException) http2Ex).duringDecode() &&
708 connection().isServer()) {
709
710
711
712
713
714
715
716 if (stream == null) {
717 try {
718 stream = encoder.connection().remote().createStream(streamId, true);
719 } catch (Http2Exception e) {
720 resetUnknownStream(ctx, streamId, http2Ex.error().code(), ctx.newPromise());
721 return;
722 }
723 }
724
725
726 if (stream != null && !stream.isHeadersSent()) {
727 try {
728 handleServerHeaderDecodeSizeError(ctx, stream);
729 } catch (Throwable cause2) {
730 onError(ctx, outbound, connectionError(INTERNAL_ERROR, cause2, "Error DecodeSizeError"));
731 }
732 }
733 }
734
735 if (stream == null) {
736 if (!outbound || connection().local().mayHaveCreatedStream(streamId)) {
737 resetUnknownStream(ctx, streamId, http2Ex.error().code(), ctx.newPromise());
738 }
739 } else {
740 resetStream(ctx, stream, http2Ex.error().code(), ctx.newPromise());
741 }
742 }
743
744
745
746
747
748
749
750
751 protected void handleServerHeaderDecodeSizeError(ChannelHandlerContext ctx, Http2Stream stream) {
752 encoder().writeHeaders(ctx, stream.id(), HEADERS_TOO_LARGE_HEADERS, 0, true, ctx.newPromise());
753 }
754
755 protected Http2FrameWriter frameWriter() {
756 return encoder().frameWriter();
757 }
758
759
760
761
762
763
764 private ChannelFuture resetUnknownStream(final ChannelHandlerContext ctx, int streamId, long errorCode,
765 ChannelPromise promise) {
766 ChannelFuture future = frameWriter().writeRstStream(ctx, streamId, errorCode, promise);
767 if (future.isDone()) {
768 closeConnectionOnError(ctx, future);
769 } else {
770 future.addListener(new ChannelFutureListener() {
771 @Override
772 public void operationComplete(ChannelFuture future) throws Exception {
773 closeConnectionOnError(ctx, future);
774 }
775 });
776 }
777 return future;
778 }
779
780 @Override
781 public ChannelFuture resetStream(final ChannelHandlerContext ctx, int streamId, long errorCode,
782 ChannelPromise promise) {
783 final Http2Stream stream = connection().stream(streamId);
784 if (stream == null) {
785 return resetUnknownStream(ctx, streamId, errorCode, promise.unvoid());
786 }
787
788 return resetStream(ctx, stream, errorCode, promise);
789 }
790
791 private ChannelFuture resetStream(final ChannelHandlerContext ctx, final Http2Stream stream,
792 long errorCode, ChannelPromise promise) {
793 promise = promise.unvoid();
794 if (stream.isResetSent()) {
795
796 return promise.setSuccess();
797 }
798
799
800
801
802
803 stream.resetSent();
804
805 final ChannelFuture future;
806
807
808 if (stream.state() == IDLE ||
809 connection().local().created(stream) && !stream.isHeadersSent() && !stream.isPushPromiseSent()) {
810 future = promise.setSuccess();
811 } else {
812 future = frameWriter().writeRstStream(ctx, stream.id(), errorCode, promise);
813 }
814 if (future.isDone()) {
815 processRstStreamWriteResult(ctx, stream, future);
816 } else {
817 future.addListener(new ChannelFutureListener() {
818 @Override
819 public void operationComplete(ChannelFuture future) throws Exception {
820 processRstStreamWriteResult(ctx, stream, future);
821 }
822 });
823 }
824
825 return future;
826 }
827
828 @Override
829 public ChannelFuture goAway(final ChannelHandlerContext ctx, final int lastStreamId, final long errorCode,
830 final ByteBuf debugData, ChannelPromise promise) {
831 promise = promise.unvoid();
832 final Http2Connection connection = connection();
833 try {
834 if (!connection.goAwaySent(lastStreamId, errorCode, debugData)) {
835 debugData.release();
836 promise.trySuccess();
837 return promise;
838 }
839 } catch (Throwable cause) {
840 debugData.release();
841 promise.tryFailure(cause);
842 return promise;
843 }
844
845
846
847 debugData.retain();
848 ChannelFuture future = frameWriter().writeGoAway(ctx, lastStreamId, errorCode, debugData, promise);
849
850 if (future.isDone()) {
851 processGoAwayWriteResult(ctx, lastStreamId, errorCode, debugData, future);
852 } else {
853 future.addListener(new ChannelFutureListener() {
854 @Override
855 public void operationComplete(ChannelFuture future) throws Exception {
856 processGoAwayWriteResult(ctx, lastStreamId, errorCode, debugData, future);
857 }
858 });
859 }
860
861 return future;
862 }
863
864
865
866
867
868 private void checkCloseConnection(ChannelFuture future) {
869
870
871 if (closeListener != null && isGracefulShutdownComplete()) {
872 ChannelFutureListener closeListener = this.closeListener;
873
874
875 this.closeListener = null;
876 try {
877 closeListener.operationComplete(future);
878 } catch (Exception e) {
879 throw new IllegalStateException("Close listener threw an unexpected exception", e);
880 }
881 }
882 }
883
884
885
886
887
888 private ChannelFuture goAway(ChannelHandlerContext ctx, Http2Exception cause, ChannelPromise promise) {
889 long errorCode = cause != null ? cause.error().code() : NO_ERROR.code();
890 int lastKnownStream;
891 if (cause != null && cause.shutdownHint() == Http2Exception.ShutdownHint.HARD_SHUTDOWN) {
892
893
894
895
896 lastKnownStream = Integer.MAX_VALUE;
897 } else {
898 lastKnownStream = connection().remote().lastStreamCreated();
899 }
900 return goAway(ctx, lastKnownStream, errorCode, Http2CodecUtil.toByteBuf(ctx, cause), promise);
901 }
902
903 private void processRstStreamWriteResult(ChannelHandlerContext ctx, Http2Stream stream, ChannelFuture future) {
904 if (future.isSuccess()) {
905 closeStream(stream, future);
906 } else {
907
908 onConnectionError(ctx, true, future.cause(), null);
909 }
910 }
911
912 private void closeConnectionOnError(ChannelHandlerContext ctx, ChannelFuture future) {
913 if (!future.isSuccess()) {
914 onConnectionError(ctx, true, future.cause(), null);
915 }
916 }
917
918 private void doCloseStream(final Http2Stream stream, ChannelFuture future) {
919 stream.close();
920 checkCloseConnection(future);
921 }
922
923
924
925
926 private static ByteBuf clientPrefaceString(Http2Connection connection) {
927 return connection.isServer() ? connectionPrefaceBuf() : null;
928 }
929
930 private static void processGoAwayWriteResult(final ChannelHandlerContext ctx, final int lastStreamId,
931 final long errorCode, final ByteBuf debugData, ChannelFuture future) {
932 try {
933 if (future.isSuccess()) {
934 if (errorCode != NO_ERROR.code()) {
935 if (logger.isDebugEnabled()) {
936 logger.debug("{} Sent GOAWAY: lastStreamId '{}', errorCode '{}', " +
937 "debugData '{}'. Forcing shutdown of the connection.",
938 ctx.channel(), lastStreamId, errorCode, debugData.toString(UTF_8), future.cause());
939 }
940 ctx.close();
941 }
942 } else {
943 if (logger.isDebugEnabled()) {
944 logger.debug("{} Sending GOAWAY failed: lastStreamId '{}', errorCode '{}', " +
945 "debugData '{}'. Forcing shutdown of the connection.",
946 ctx.channel(), lastStreamId, errorCode, debugData.toString(UTF_8), future.cause());
947 }
948 ctx.close();
949 }
950 } finally {
951
952 debugData.release();
953 }
954 }
955
956
957
958
959 private static final class ClosingChannelFutureListener implements ChannelFutureListener {
960 private final ChannelHandlerContext ctx;
961 private final ChannelPromise promise;
962 private final Future<?> timeoutTask;
963 private boolean closed;
964
965 ClosingChannelFutureListener(ChannelHandlerContext ctx, ChannelPromise promise) {
966 this.ctx = ctx;
967 this.promise = promise;
968 timeoutTask = null;
969 }
970
971 ClosingChannelFutureListener(final ChannelHandlerContext ctx, final ChannelPromise promise,
972 long timeout, TimeUnit unit) {
973 this.ctx = ctx;
974 this.promise = promise;
975 timeoutTask = ctx.executor().schedule(new Runnable() {
976 @Override
977 public void run() {
978 doClose();
979 }
980 }, timeout, unit);
981 }
982
983 @Override
984 public void operationComplete(ChannelFuture sentGoAwayFuture) {
985 if (timeoutTask != null) {
986 timeoutTask.cancel(false);
987 }
988 doClose();
989 }
990
991 private void doClose() {
992
993
994 if (closed) {
995
996 assert timeoutTask != null;
997 return;
998 }
999 closed = true;
1000 if (promise == null) {
1001 ctx.close();
1002 } else {
1003 ctx.close(promise);
1004 }
1005 }
1006 }
1007 }