1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package io.netty.handler.codec.http2;
16
17 import io.netty.buffer.ByteBuf;
18 import io.netty.buffer.ByteBufUtil;
19 import io.netty.buffer.Unpooled;
20 import io.netty.channel.ChannelFuture;
21 import io.netty.channel.ChannelFutureListener;
22 import io.netty.channel.ChannelHandlerContext;
23 import io.netty.channel.ChannelOutboundHandler;
24 import io.netty.channel.ChannelPromise;
25 import io.netty.handler.codec.ByteToMessageDecoder;
26 import io.netty.handler.codec.http.HttpResponseStatus;
27 import io.netty.handler.codec.http2.Http2Exception.CompositeStreamException;
28 import io.netty.handler.codec.http2.Http2Exception.StreamException;
29 import io.netty.util.CharsetUtil;
30 import io.netty.util.concurrent.Future;
31 import io.netty.util.internal.logging.InternalLogger;
32 import io.netty.util.internal.logging.InternalLoggerFactory;
33
34 import java.net.SocketAddress;
35 import java.util.List;
36 import java.util.concurrent.TimeUnit;
37
38 import static io.netty.buffer.ByteBufUtil.hexDump;
39 import static io.netty.buffer.Unpooled.EMPTY_BUFFER;
40 import static io.netty.handler.codec.http2.Http2CodecUtil.HTTP_UPGRADE_STREAM_ID;
41 import static io.netty.handler.codec.http2.Http2CodecUtil.connectionPrefaceBuf;
42 import static io.netty.handler.codec.http2.Http2CodecUtil.getEmbeddedHttp2Exception;
43 import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
44 import static io.netty.handler.codec.http2.Http2Error.NO_ERROR;
45 import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
46 import static io.netty.handler.codec.http2.Http2Exception.connectionError;
47 import static io.netty.handler.codec.http2.Http2Exception.isStreamError;
48 import static io.netty.handler.codec.http2.Http2FrameTypes.SETTINGS;
49 import static io.netty.handler.codec.http2.Http2Stream.State.IDLE;
50 import static io.netty.util.CharsetUtil.UTF_8;
51 import static io.netty.util.internal.ObjectUtil.checkNotNull;
52 import static java.lang.Math.min;
53 import static java.util.concurrent.TimeUnit.MILLISECONDS;
54
55
56
57
58
59
60
61
62
63
64 public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http2LifecycleManager,
65 ChannelOutboundHandler {
66
67 private static final InternalLogger logger = InternalLoggerFactory.getInstance(Http2ConnectionHandler.class);
68
69 private static final Http2Headers HEADERS_TOO_LARGE_HEADERS = ReadOnlyHttp2Headers.serverHeaders(false,
70 HttpResponseStatus.REQUEST_HEADER_FIELDS_TOO_LARGE.codeAsText());
71 private static final ByteBuf HTTP_1_X_BUF = Unpooled.unreleasableBuffer(
72 Unpooled.wrappedBuffer(new byte[] {'H', 'T', 'T', 'P', '/', '1', '.'})).asReadOnly();
73
74 private final Http2ConnectionDecoder decoder;
75 private final Http2ConnectionEncoder encoder;
76 private final Http2Settings initialSettings;
77 private final boolean decoupleCloseAndGoAway;
78 private final boolean flushPreface;
79 private ChannelFutureListener closeListener;
80 private BaseDecoder byteDecoder;
81 private long gracefulShutdownTimeoutMillis;
82
83 protected Http2ConnectionHandler(Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder,
84 Http2Settings initialSettings) {
85 this(decoder, encoder, initialSettings, false);
86 }
87
88 protected Http2ConnectionHandler(Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder,
89 Http2Settings initialSettings, boolean decoupleCloseAndGoAway) {
90 this(decoder, encoder, initialSettings, decoupleCloseAndGoAway, true);
91 }
92
93 protected Http2ConnectionHandler(Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder,
94 Http2Settings initialSettings, boolean decoupleCloseAndGoAway,
95 boolean flushPreface) {
96 this.initialSettings = checkNotNull(initialSettings, "initialSettings");
97 this.decoder = checkNotNull(decoder, "decoder");
98 this.encoder = checkNotNull(encoder, "encoder");
99 this.decoupleCloseAndGoAway = decoupleCloseAndGoAway;
100 this.flushPreface = flushPreface;
101 if (encoder.connection() != decoder.connection()) {
102 throw new IllegalArgumentException("Encoder and Decoder do not share the same connection object");
103 }
104 }
105
106
107
108
109
110
111 public long gracefulShutdownTimeoutMillis() {
112 return gracefulShutdownTimeoutMillis;
113 }
114
115
116
117
118
119
120
121 public void gracefulShutdownTimeoutMillis(long gracefulShutdownTimeoutMillis) {
122 if (gracefulShutdownTimeoutMillis < -1) {
123 throw new IllegalArgumentException("gracefulShutdownTimeoutMillis: " + gracefulShutdownTimeoutMillis +
124 " (expected: -1 for indefinite or >= 0)");
125 }
126 this.gracefulShutdownTimeoutMillis = gracefulShutdownTimeoutMillis;
127 }
128
129 public Http2Connection connection() {
130 return encoder.connection();
131 }
132
133 public Http2ConnectionDecoder decoder() {
134 return decoder;
135 }
136
137 public Http2ConnectionEncoder encoder() {
138 return encoder;
139 }
140
141 private boolean prefaceSent() {
142 return byteDecoder != null && byteDecoder.prefaceSent();
143 }
144
145
146
147
148
149 public void onHttpClientUpgrade() throws Http2Exception {
150 if (connection().isServer()) {
151 throw connectionError(PROTOCOL_ERROR, "Client-side HTTP upgrade requested for a server");
152 }
153 if (!prefaceSent()) {
154
155
156 throw connectionError(INTERNAL_ERROR, "HTTP upgrade must occur after preface was sent");
157 }
158 if (decoder.prefaceReceived()) {
159 throw connectionError(PROTOCOL_ERROR, "HTTP upgrade must occur before HTTP/2 preface is received");
160 }
161
162
163 connection().local().createStream(HTTP_UPGRADE_STREAM_ID, true);
164 }
165
166
167
168
169
170 public void onHttpServerUpgrade(Http2Settings settings) throws Http2Exception {
171 if (!connection().isServer()) {
172 throw connectionError(PROTOCOL_ERROR, "Server-side HTTP upgrade requested for a client");
173 }
174 if (!prefaceSent()) {
175
176
177 throw connectionError(INTERNAL_ERROR, "HTTP upgrade must occur after preface was sent");
178 }
179 if (decoder.prefaceReceived()) {
180 throw connectionError(PROTOCOL_ERROR, "HTTP upgrade must occur before HTTP/2 preface is received");
181 }
182
183
184 encoder.remoteSettings(settings);
185
186
187 connection().remote().createStream(HTTP_UPGRADE_STREAM_ID, true);
188 }
189
190 @Override
191 public void flush(ChannelHandlerContext ctx) {
192 try {
193
194 encoder.flowController().writePendingBytes();
195 ctx.flush();
196 } catch (Http2Exception e) {
197 onError(ctx, true, e);
198 } catch (Throwable cause) {
199 onError(ctx, true, connectionError(INTERNAL_ERROR, cause, "Error flushing"));
200 }
201 }
202
203 private abstract class BaseDecoder {
204 public abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;
205 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { }
206 public void channelActive(ChannelHandlerContext ctx) throws Exception { }
207
208 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
209
210 encoder().close();
211 decoder().close();
212
213
214
215 connection().close(ctx.voidPromise());
216 }
217
218
219
220
221 public boolean prefaceSent() {
222 return true;
223 }
224 }
225
226 private final class PrefaceDecoder extends BaseDecoder {
227 private ByteBuf clientPrefaceString;
228 private boolean prefaceSent;
229
230 PrefaceDecoder(ChannelHandlerContext ctx) throws Exception {
231 clientPrefaceString = clientPrefaceString(encoder.connection());
232
233
234 sendPreface(ctx);
235 }
236
237 @Override
238 public boolean prefaceSent() {
239 return prefaceSent;
240 }
241
242 @Override
243 public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
244 try {
245 if (ctx.channel().isActive() && readClientPrefaceString(in) && verifyFirstFrameIsSettings(in)) {
246
247 byteDecoder = new FrameDecoder();
248 byteDecoder.decode(ctx, in, out);
249 }
250 } catch (Throwable e) {
251 if (byteDecoder != null) {
252
253 in.skipBytes(in.readableBytes());
254 }
255 onError(ctx, false, e);
256 }
257 }
258
259 @Override
260 public void channelActive(ChannelHandlerContext ctx) throws Exception {
261
262 sendPreface(ctx);
263 }
264
265 @Override
266 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
267 cleanup();
268 super.channelInactive(ctx);
269 }
270
271
272
273
274 @Override
275 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
276 cleanup();
277 }
278
279
280
281
282 private void cleanup() {
283 if (clientPrefaceString != null) {
284 clientPrefaceString.release();
285 clientPrefaceString = null;
286 }
287 }
288
289
290
291
292
293
294
295 private boolean readClientPrefaceString(ByteBuf in) throws Http2Exception {
296 if (clientPrefaceString == null) {
297 return true;
298 }
299
300 int prefaceRemaining = clientPrefaceString.readableBytes();
301 int bytesRead = min(in.readableBytes(), prefaceRemaining);
302
303
304 if (bytesRead == 0 || !ByteBufUtil.equals(in, in.readerIndex(),
305 clientPrefaceString, clientPrefaceString.readerIndex(),
306 bytesRead)) {
307 int maxSearch = 1024;
308 int http1Index =
309 ByteBufUtil.indexOf(HTTP_1_X_BUF, in.slice(in.readerIndex(), min(in.readableBytes(), maxSearch)));
310 if (http1Index != -1) {
311 String chunk = in.toString(in.readerIndex(), http1Index - in.readerIndex(), CharsetUtil.US_ASCII);
312 throw connectionError(PROTOCOL_ERROR, "Unexpected HTTP/1.x request: %s", chunk);
313 }
314 String receivedBytes = hexDump(in, in.readerIndex(),
315 min(in.readableBytes(), clientPrefaceString.readableBytes()));
316 throw connectionError(PROTOCOL_ERROR, "HTTP/2 client preface string missing or corrupt. " +
317 "Hex dump for received bytes: %s", receivedBytes);
318 }
319 in.skipBytes(bytesRead);
320 clientPrefaceString.skipBytes(bytesRead);
321
322 if (!clientPrefaceString.isReadable()) {
323
324 clientPrefaceString.release();
325 clientPrefaceString = null;
326 return true;
327 }
328 return false;
329 }
330
331
332
333
334
335
336
337
338
339 private boolean verifyFirstFrameIsSettings(ByteBuf in) throws Http2Exception {
340 if (in.readableBytes() < 5) {
341
342 return false;
343 }
344
345 short frameType = in.getUnsignedByte(in.readerIndex() + 3);
346 if (frameType != SETTINGS) {
347 throw connectionError(PROTOCOL_ERROR, "First received frame was not SETTINGS. " +
348 "Hex dump for first 5 bytes: %s",
349 hexDump(in, in.readerIndex(), 5));
350 }
351 short flags = in.getUnsignedByte(in.readerIndex() + 4);
352 if ((flags & Http2Flags.ACK) != 0) {
353 throw connectionError(PROTOCOL_ERROR, "First received frame was SETTINGS frame but had ACK flag set. " +
354 "Hex dump for first 5 bytes: %s",
355 hexDump(in, in.readerIndex(), 5));
356 }
357 return true;
358 }
359
360
361
362
363 private void sendPreface(ChannelHandlerContext ctx) throws Exception {
364 if (prefaceSent || !ctx.channel().isActive()) {
365 return;
366 }
367
368 prefaceSent = true;
369
370 final boolean isClient = !connection().isServer();
371 if (isClient) {
372
373 ctx.write(connectionPrefaceBuf()).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
374 }
375
376
377 encoder.writeSettings(ctx, initialSettings, ctx.newPromise()).addListener(
378 ChannelFutureListener.CLOSE_ON_FAILURE);
379
380 try {
381 if (isClient) {
382
383
384
385 userEventTriggered(ctx, Http2ConnectionPrefaceAndSettingsFrameWrittenEvent.INSTANCE);
386 }
387 } finally {
388 if (flushPreface) {
389
390
391
392 ctx.flush();
393 }
394 }
395 }
396 }
397
398 private final class FrameDecoder extends BaseDecoder {
399 @Override
400 public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
401 try {
402 decoder.decodeFrame(ctx, in, out);
403 } catch (Throwable e) {
404 onError(ctx, false, e);
405 }
406 }
407 }
408
409 @Override
410 public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
411
412 encoder.lifecycleManager(this);
413 decoder.lifecycleManager(this);
414 encoder.flowController().channelHandlerContext(ctx);
415 decoder.flowController().channelHandlerContext(ctx);
416 byteDecoder = new PrefaceDecoder(ctx);
417 }
418
419 @Override
420 protected void handlerRemoved0(ChannelHandlerContext ctx) throws Exception {
421 if (byteDecoder != null) {
422 byteDecoder.handlerRemoved(ctx);
423 byteDecoder = null;
424 }
425 }
426
427 @Override
428 public void channelActive(ChannelHandlerContext ctx) throws Exception {
429 if (byteDecoder == null) {
430 byteDecoder = new PrefaceDecoder(ctx);
431 }
432 byteDecoder.channelActive(ctx);
433 super.channelActive(ctx);
434 }
435
436 @Override
437 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
438
439 super.channelInactive(ctx);
440 if (byteDecoder != null) {
441 byteDecoder.channelInactive(ctx);
442 byteDecoder = null;
443 }
444 }
445
446 @Override
447 public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
448
449
450 try {
451 if (ctx.channel().isWritable()) {
452 flush(ctx);
453 }
454 encoder.flowController().channelWritabilityChanged();
455 } finally {
456 super.channelWritabilityChanged(ctx);
457 }
458 }
459
460 @Override
461 protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
462 byteDecoder.decode(ctx, in, out);
463 }
464
465 @Override
466 public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
467 ctx.bind(localAddress, promise);
468 }
469
470 @Override
471 public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress,
472 ChannelPromise promise) throws Exception {
473 ctx.connect(remoteAddress, localAddress, promise);
474 }
475
476 @Override
477 public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
478 ctx.disconnect(promise);
479 }
480
481 @Override
482 public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
483 if (decoupleCloseAndGoAway) {
484 ctx.close(promise);
485 return;
486 }
487 promise = promise.unvoid();
488
489 if (!ctx.channel().isActive() || !prefaceSent()) {
490 ctx.close(promise);
491 return;
492 }
493
494
495
496
497
498
499 ChannelFuture f = connection().goAwaySent() ? ctx.write(EMPTY_BUFFER) : goAway(ctx, null, ctx.newPromise());
500 ctx.flush();
501 doGracefulShutdown(ctx, f, promise);
502 }
503
504 private ChannelFutureListener newClosingChannelFutureListener(
505 ChannelHandlerContext ctx, ChannelPromise promise) {
506 long gracefulShutdownTimeoutMillis = this.gracefulShutdownTimeoutMillis;
507 return gracefulShutdownTimeoutMillis < 0 ?
508 new ClosingChannelFutureListener(ctx, promise) :
509 new ClosingChannelFutureListener(ctx, promise, gracefulShutdownTimeoutMillis, MILLISECONDS);
510 }
511
512 private void doGracefulShutdown(ChannelHandlerContext ctx, ChannelFuture future, final ChannelPromise promise) {
513 final ChannelFutureListener listener = newClosingChannelFutureListener(ctx, promise);
514 if (isGracefulShutdownComplete()) {
515
516
517 future.addListener(listener);
518 } else {
519
520
521
522
523 if (closeListener == null) {
524 closeListener = listener;
525 } else if (promise != null) {
526 final ChannelFutureListener oldCloseListener = closeListener;
527 closeListener = new ChannelFutureListener() {
528 @Override
529 public void operationComplete(ChannelFuture future) throws Exception {
530 try {
531 oldCloseListener.operationComplete(future);
532 } finally {
533 listener.operationComplete(future);
534 }
535 }
536 };
537 }
538 }
539 }
540
541 @Override
542 public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
543 ctx.deregister(promise);
544 }
545
546 @Override
547 public void read(ChannelHandlerContext ctx) throws Exception {
548 ctx.read();
549 }
550
551 @Override
552 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
553 ctx.write(msg, promise);
554 }
555
556 @Override
557 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
558
559
560 try {
561
562 channelReadComplete0(ctx);
563 } finally {
564 flush(ctx);
565 }
566 }
567
568 final void channelReadComplete0(ChannelHandlerContext ctx) {
569
570 discardSomeReadBytes();
571
572
573
574
575 if (!ctx.channel().config().isAutoRead()) {
576 ctx.read();
577 }
578
579 ctx.fireChannelReadComplete();
580 }
581
582
583
584
585 @Override
586 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
587 if (getEmbeddedHttp2Exception(cause) != null) {
588
589 onError(ctx, false, cause);
590 } else {
591 super.exceptionCaught(ctx, cause);
592 }
593 }
594
595
596
597
598
599
600
601
602 @Override
603 public void closeStreamLocal(Http2Stream stream, ChannelFuture future) {
604 switch (stream.state()) {
605 case HALF_CLOSED_LOCAL:
606 case OPEN:
607 stream.closeLocalSide();
608 break;
609 default:
610 closeStream(stream, future);
611 break;
612 }
613 }
614
615
616
617
618
619
620
621
622 @Override
623 public void closeStreamRemote(Http2Stream stream, ChannelFuture future) {
624 switch (stream.state()) {
625 case HALF_CLOSED_REMOTE:
626 case OPEN:
627 stream.closeRemoteSide();
628 break;
629 default:
630 closeStream(stream, future);
631 break;
632 }
633 }
634
635 @Override
636 public void closeStream(final Http2Stream stream, ChannelFuture future) {
637 if (future.isDone()) {
638 doCloseStream(stream, future);
639 } else {
640 future.addListener(new ChannelFutureListener() {
641 @Override
642 public void operationComplete(ChannelFuture future) {
643 doCloseStream(stream, future);
644 }
645 });
646 }
647 }
648
649
650
651
652 @Override
653 public void onError(ChannelHandlerContext ctx, boolean outbound, Throwable cause) {
654 Http2Exception embedded = getEmbeddedHttp2Exception(cause);
655 if (isStreamError(embedded)) {
656 onStreamError(ctx, outbound, cause, (StreamException) embedded);
657 } else if (embedded instanceof CompositeStreamException) {
658 CompositeStreamException compositException = (CompositeStreamException) embedded;
659 for (StreamException streamException : compositException) {
660 onStreamError(ctx, outbound, cause, streamException);
661 }
662 } else {
663 onConnectionError(ctx, outbound, cause, embedded);
664 }
665 ctx.flush();
666 }
667
668
669
670
671
672
673 protected boolean isGracefulShutdownComplete() {
674 return connection().numActiveStreams() == 0;
675 }
676
677
678
679
680
681
682
683
684
685
686
687 protected void onConnectionError(ChannelHandlerContext ctx, boolean outbound,
688 Throwable cause, Http2Exception http2Ex) {
689 if (http2Ex == null) {
690 http2Ex = new Http2Exception(INTERNAL_ERROR, cause.getMessage(), cause);
691 }
692
693 ChannelPromise promise = ctx.newPromise();
694 ChannelFuture future = goAway(ctx, http2Ex, ctx.newPromise());
695 if (http2Ex.shutdownHint() == Http2Exception.ShutdownHint.GRACEFUL_SHUTDOWN) {
696 doGracefulShutdown(ctx, future, promise);
697 } else {
698 future.addListener(newClosingChannelFutureListener(ctx, promise));
699 }
700 }
701
702
703
704
705
706
707
708
709
710
711 protected void onStreamError(ChannelHandlerContext ctx, boolean outbound,
712 @SuppressWarnings("unused") Throwable cause, StreamException http2Ex) {
713 final int streamId = http2Ex.streamId();
714 Http2Stream stream = connection().stream(streamId);
715
716
717 if (http2Ex instanceof Http2Exception.HeaderListSizeException &&
718 ((Http2Exception.HeaderListSizeException) http2Ex).duringDecode() &&
719 connection().isServer()) {
720
721
722
723
724
725
726
727 if (stream == null) {
728 try {
729 stream = encoder.connection().remote().createStream(streamId, true);
730 } catch (Http2Exception e) {
731 encoder().writeRstStream(ctx, streamId, http2Ex.error().code(), ctx.newPromise());
732 return;
733 }
734 }
735
736
737 if (stream != null && !stream.isHeadersSent()) {
738 try {
739 handleServerHeaderDecodeSizeError(ctx, stream);
740 } catch (Throwable cause2) {
741 onError(ctx, outbound, connectionError(INTERNAL_ERROR, cause2, "Error DecodeSizeError"));
742 }
743 }
744 }
745
746 if (stream == null) {
747 if (!outbound || connection().local().mayHaveCreatedStream(streamId)) {
748 encoder().writeRstStream(ctx, streamId, http2Ex.error().code(), ctx.newPromise());
749 }
750 } else {
751 encoder().writeRstStream(ctx, streamId, http2Ex.error().code(), ctx.newPromise());
752 }
753 }
754
755
756
757
758
759
760
761
762 protected void handleServerHeaderDecodeSizeError(ChannelHandlerContext ctx, Http2Stream stream) {
763 encoder().writeHeaders(ctx, stream.id(), HEADERS_TOO_LARGE_HEADERS, 0, true, ctx.newPromise());
764 }
765
766 protected Http2FrameWriter frameWriter() {
767 return encoder().frameWriter();
768 }
769
770
771
772
773
774
775 private ChannelFuture resetUnknownStream(final ChannelHandlerContext ctx, int streamId, long errorCode,
776 ChannelPromise promise) {
777 ChannelFuture future = frameWriter().writeRstStream(ctx, streamId, errorCode, promise);
778 if (future.isDone()) {
779 closeConnectionOnError(ctx, future);
780 } else {
781 future.addListener(new ChannelFutureListener() {
782 @Override
783 public void operationComplete(ChannelFuture future) throws Exception {
784 closeConnectionOnError(ctx, future);
785 }
786 });
787 }
788 return future;
789 }
790
791 @Override
792 public ChannelFuture resetStream(final ChannelHandlerContext ctx, int streamId, long errorCode,
793 ChannelPromise promise) {
794 final Http2Stream stream = connection().stream(streamId);
795 if (stream == null) {
796 return resetUnknownStream(ctx, streamId, errorCode, promise.unvoid());
797 }
798
799 return resetStream(ctx, stream, errorCode, promise);
800 }
801
802 private ChannelFuture resetStream(final ChannelHandlerContext ctx, final Http2Stream stream,
803 long errorCode, ChannelPromise promise) {
804 promise = promise.unvoid();
805 if (stream.isResetSent()) {
806
807 return promise.setSuccess();
808 }
809
810
811
812
813
814 stream.resetSent();
815
816 final ChannelFuture future;
817
818
819 if (stream.state() == IDLE ||
820 connection().local().created(stream) && !stream.isHeadersSent() && !stream.isPushPromiseSent()) {
821 future = promise.setSuccess();
822 } else {
823 future = frameWriter().writeRstStream(ctx, stream.id(), errorCode, promise);
824 }
825 if (future.isDone()) {
826 processRstStreamWriteResult(ctx, stream, future);
827 } else {
828 future.addListener(new ChannelFutureListener() {
829 @Override
830 public void operationComplete(ChannelFuture future) throws Exception {
831 processRstStreamWriteResult(ctx, stream, future);
832 }
833 });
834 }
835
836 return future;
837 }
838
839 @Override
840 public ChannelFuture goAway(final ChannelHandlerContext ctx, final int lastStreamId, final long errorCode,
841 final ByteBuf debugData, ChannelPromise promise) {
842 promise = promise.unvoid();
843 final Http2Connection connection = connection();
844 try {
845 if (!connection.goAwaySent(lastStreamId, errorCode, debugData)) {
846 debugData.release();
847 promise.trySuccess();
848 return promise;
849 }
850 } catch (Throwable cause) {
851 debugData.release();
852 promise.tryFailure(cause);
853 return promise;
854 }
855
856
857
858 debugData.retain();
859 ChannelFuture future = frameWriter().writeGoAway(ctx, lastStreamId, errorCode, debugData, promise);
860
861 if (future.isDone()) {
862 processGoAwayWriteResult(ctx, lastStreamId, errorCode, debugData, future);
863 } else {
864 future.addListener(new ChannelFutureListener() {
865 @Override
866 public void operationComplete(ChannelFuture future) throws Exception {
867 processGoAwayWriteResult(ctx, lastStreamId, errorCode, debugData, future);
868 }
869 });
870 }
871
872 return future;
873 }
874
875
876
877
878
879 private void checkCloseConnection(ChannelFuture future) {
880
881
882 if (closeListener != null && isGracefulShutdownComplete()) {
883 ChannelFutureListener closeListener = this.closeListener;
884
885
886 this.closeListener = null;
887 try {
888 closeListener.operationComplete(future);
889 } catch (Exception e) {
890 throw new IllegalStateException("Close listener threw an unexpected exception", e);
891 }
892 }
893 }
894
895
896
897
898
899 private ChannelFuture goAway(ChannelHandlerContext ctx, Http2Exception cause, ChannelPromise promise) {
900 long errorCode = cause != null ? cause.error().code() : NO_ERROR.code();
901 int lastKnownStream;
902 if (cause != null && cause.shutdownHint() == Http2Exception.ShutdownHint.HARD_SHUTDOWN) {
903
904
905
906
907 lastKnownStream = Integer.MAX_VALUE;
908 } else {
909 lastKnownStream = connection().remote().lastStreamCreated();
910 }
911 return goAway(ctx, lastKnownStream, errorCode, Http2CodecUtil.toByteBuf(ctx, cause), promise);
912 }
913
914 private void processRstStreamWriteResult(ChannelHandlerContext ctx, Http2Stream stream, ChannelFuture future) {
915 if (future.isSuccess()) {
916 closeStream(stream, future);
917 } else {
918
919 onConnectionError(ctx, true, future.cause(), null);
920 }
921 }
922
923 private void closeConnectionOnError(ChannelHandlerContext ctx, ChannelFuture future) {
924 if (!future.isSuccess()) {
925 onConnectionError(ctx, true, future.cause(), null);
926 }
927 }
928
929 private void doCloseStream(final Http2Stream stream, ChannelFuture future) {
930 stream.close();
931 checkCloseConnection(future);
932 }
933
934
935
936
937 private static ByteBuf clientPrefaceString(Http2Connection connection) {
938 return connection.isServer() ? connectionPrefaceBuf() : null;
939 }
940
941 private static void processGoAwayWriteResult(final ChannelHandlerContext ctx, final int lastStreamId,
942 final long errorCode, final ByteBuf debugData, ChannelFuture future) {
943 try {
944 if (future.isSuccess()) {
945 if (errorCode != NO_ERROR.code()) {
946 if (logger.isDebugEnabled()) {
947 logger.debug("{} Sent GOAWAY: lastStreamId '{}', errorCode '{}', " +
948 "debugData '{}'. Forcing shutdown of the connection.",
949 ctx.channel(), lastStreamId, errorCode, debugData.toString(UTF_8), future.cause());
950 }
951 ctx.close();
952 }
953 } else {
954 if (logger.isDebugEnabled()) {
955 logger.debug("{} Sending GOAWAY failed: lastStreamId '{}', errorCode '{}', " +
956 "debugData '{}'. Forcing shutdown of the connection.",
957 ctx.channel(), lastStreamId, errorCode, debugData.toString(UTF_8), future.cause());
958 }
959 ctx.close();
960 }
961 } finally {
962
963 debugData.release();
964 }
965 }
966
967
968
969
970 private static final class ClosingChannelFutureListener implements ChannelFutureListener {
971 private final ChannelHandlerContext ctx;
972 private final ChannelPromise promise;
973 private final Future<?> timeoutTask;
974 private boolean closed;
975
976 ClosingChannelFutureListener(ChannelHandlerContext ctx, ChannelPromise promise) {
977 this.ctx = ctx;
978 this.promise = promise;
979 timeoutTask = null;
980 }
981
982 ClosingChannelFutureListener(final ChannelHandlerContext ctx, final ChannelPromise promise,
983 long timeout, TimeUnit unit) {
984 this.ctx = ctx;
985 this.promise = promise;
986 timeoutTask = ctx.executor().schedule(new Runnable() {
987 @Override
988 public void run() {
989 doClose();
990 }
991 }, timeout, unit);
992 }
993
994 @Override
995 public void operationComplete(ChannelFuture sentGoAwayFuture) {
996 if (timeoutTask != null) {
997 timeoutTask.cancel(false);
998 }
999 doClose();
1000 }
1001
1002 private void doClose() {
1003
1004
1005 if (closed) {
1006
1007 assert timeoutTask != null;
1008 return;
1009 }
1010 closed = true;
1011 if (promise == null) {
1012 ctx.close();
1013 } else {
1014 ctx.close(promise);
1015 }
1016 }
1017 }
1018 }