1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package io.netty.handler.codec.http2;
16
17 import io.netty.buffer.ByteBuf;
18 import io.netty.buffer.ByteBufUtil;
19 import io.netty.buffer.Unpooled;
20 import io.netty.channel.ChannelFuture;
21 import io.netty.channel.ChannelFutureListener;
22 import io.netty.channel.ChannelHandlerContext;
23 import io.netty.channel.ChannelOutboundHandler;
24 import io.netty.channel.ChannelPromise;
25 import io.netty.handler.codec.ByteToMessageDecoder;
26 import io.netty.handler.codec.http.HttpResponseStatus;
27 import io.netty.handler.codec.http2.Http2Exception.CompositeStreamException;
28 import io.netty.handler.codec.http2.Http2Exception.StreamException;
29 import io.netty.util.CharsetUtil;
30 import io.netty.util.concurrent.Future;
31 import io.netty.util.internal.logging.InternalLogger;
32 import io.netty.util.internal.logging.InternalLoggerFactory;
33
34 import java.net.SocketAddress;
35 import java.util.List;
36 import java.util.concurrent.TimeUnit;
37
38 import static io.netty.buffer.ByteBufUtil.hexDump;
39 import static io.netty.buffer.Unpooled.EMPTY_BUFFER;
40 import static io.netty.handler.codec.http2.Http2CodecUtil.HTTP_UPGRADE_STREAM_ID;
41 import static io.netty.handler.codec.http2.Http2CodecUtil.connectionPrefaceBuf;
42 import static io.netty.handler.codec.http2.Http2CodecUtil.getEmbeddedHttp2Exception;
43 import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
44 import static io.netty.handler.codec.http2.Http2Error.NO_ERROR;
45 import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
46 import static io.netty.handler.codec.http2.Http2Exception.connectionError;
47 import static io.netty.handler.codec.http2.Http2Exception.isStreamError;
48 import static io.netty.handler.codec.http2.Http2FrameTypes.SETTINGS;
49 import static io.netty.handler.codec.http2.Http2Stream.State.IDLE;
50 import static io.netty.util.CharsetUtil.UTF_8;
51 import static io.netty.util.internal.ObjectUtil.checkNotNull;
52 import static java.lang.Math.min;
53 import static java.util.concurrent.TimeUnit.MILLISECONDS;
54
55
56
57
58
59
60
61
62
63
64 public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http2LifecycleManager,
65 ChannelOutboundHandler {
66
67 private static final InternalLogger logger = InternalLoggerFactory.getInstance(Http2ConnectionHandler.class);
68
69 private static final Http2Headers HEADERS_TOO_LARGE_HEADERS = ReadOnlyHttp2Headers.serverHeaders(false,
70 HttpResponseStatus.REQUEST_HEADER_FIELDS_TOO_LARGE.codeAsText());
71 private static final ByteBuf HTTP_1_X_BUF = Unpooled.unreleasableBuffer(
72 Unpooled.wrappedBuffer(new byte[] {'H', 'T', 'T', 'P', '/', '1', '.'})).asReadOnly();
73
74 private final Http2ConnectionDecoder decoder;
75 private final Http2ConnectionEncoder encoder;
76 private final Http2Settings initialSettings;
77 private final boolean decoupleCloseAndGoAway;
78 private final boolean flushPreface;
79 private ChannelFutureListener closeListener;
80 private BaseDecoder byteDecoder;
81 private long gracefulShutdownTimeoutMillis;
82
83 protected Http2ConnectionHandler(Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder,
84 Http2Settings initialSettings) {
85 this(decoder, encoder, initialSettings, false);
86 }
87
88 protected Http2ConnectionHandler(Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder,
89 Http2Settings initialSettings, boolean decoupleCloseAndGoAway) {
90 this(decoder, encoder, initialSettings, decoupleCloseAndGoAway, true);
91 }
92
93 protected Http2ConnectionHandler(Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder,
94 Http2Settings initialSettings, boolean decoupleCloseAndGoAway,
95 boolean flushPreface) {
96 this.initialSettings = checkNotNull(initialSettings, "initialSettings");
97 this.decoder = checkNotNull(decoder, "decoder");
98 this.encoder = checkNotNull(encoder, "encoder");
99 this.decoupleCloseAndGoAway = decoupleCloseAndGoAway;
100 this.flushPreface = flushPreface;
101 if (encoder.connection() != decoder.connection()) {
102 throw new IllegalArgumentException("Encoder and Decoder do not share the same connection object");
103 }
104 }
105
106
107
108
109
110
111 public long gracefulShutdownTimeoutMillis() {
112 return gracefulShutdownTimeoutMillis;
113 }
114
115
116
117
118
119
120
121 public void gracefulShutdownTimeoutMillis(long gracefulShutdownTimeoutMillis) {
122 if (gracefulShutdownTimeoutMillis < -1) {
123 throw new IllegalArgumentException("gracefulShutdownTimeoutMillis: " + gracefulShutdownTimeoutMillis +
124 " (expected: -1 for indefinite or >= 0)");
125 }
126 this.gracefulShutdownTimeoutMillis = gracefulShutdownTimeoutMillis;
127 }
128
129 public Http2Connection connection() {
130 return encoder.connection();
131 }
132
133 public Http2ConnectionDecoder decoder() {
134 return decoder;
135 }
136
137 public Http2ConnectionEncoder encoder() {
138 return encoder;
139 }
140
141 private boolean prefaceSent() {
142 return byteDecoder != null && byteDecoder.prefaceSent();
143 }
144
145
146
147
148
149 public void onHttpClientUpgrade() throws Http2Exception {
150 if (connection().isServer()) {
151 throw connectionError(PROTOCOL_ERROR, "Client-side HTTP upgrade requested for a server");
152 }
153 if (!prefaceSent()) {
154
155
156 throw connectionError(INTERNAL_ERROR, "HTTP upgrade must occur after preface was sent");
157 }
158 if (decoder.prefaceReceived()) {
159 throw connectionError(PROTOCOL_ERROR, "HTTP upgrade must occur before HTTP/2 preface is received");
160 }
161
162
163 connection().local().createStream(HTTP_UPGRADE_STREAM_ID, true);
164 }
165
166
167
168
169
170 public void onHttpServerUpgrade(Http2Settings settings) throws Http2Exception {
171 if (!connection().isServer()) {
172 throw connectionError(PROTOCOL_ERROR, "Server-side HTTP upgrade requested for a client");
173 }
174 if (!prefaceSent()) {
175
176
177 throw connectionError(INTERNAL_ERROR, "HTTP upgrade must occur after preface was sent");
178 }
179 if (decoder.prefaceReceived()) {
180 throw connectionError(PROTOCOL_ERROR, "HTTP upgrade must occur before HTTP/2 preface is received");
181 }
182
183
184 encoder.remoteSettings(settings);
185
186
187 connection().remote().createStream(HTTP_UPGRADE_STREAM_ID, true);
188 }
189
190 @Override
191 public void flush(ChannelHandlerContext ctx) {
192 try {
193
194 encoder.flowController().writePendingBytes();
195 ctx.flush();
196 } catch (Http2Exception e) {
197 onError(ctx, true, e);
198 } catch (Throwable cause) {
199 onError(ctx, true, connectionError(INTERNAL_ERROR, cause, "Error flushing"));
200 }
201 }
202
203 private abstract class BaseDecoder {
204 public abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;
205 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { }
206 public void channelActive(ChannelHandlerContext ctx) throws Exception { }
207
208 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
209
210 encoder().close();
211 decoder().close();
212
213
214
215 connection().close(ctx.voidPromise());
216 }
217
218
219
220
221 public boolean prefaceSent() {
222 return true;
223 }
224 }
225
226 private final class PrefaceDecoder extends BaseDecoder {
227 private ByteBuf clientPrefaceString;
228 private boolean prefaceSent;
229
230 PrefaceDecoder(ChannelHandlerContext ctx) throws Exception {
231 clientPrefaceString = clientPrefaceString(encoder.connection());
232
233
234 sendPreface(ctx);
235 }
236
237 @Override
238 public boolean prefaceSent() {
239 return prefaceSent;
240 }
241
242 @Override
243 public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
244 try {
245 if (ctx.channel().isActive() && readClientPrefaceString(in) && verifyFirstFrameIsSettings(in)) {
246
247 byteDecoder = new FrameDecoder();
248 byteDecoder.decode(ctx, in, out);
249 }
250 } catch (Throwable e) {
251 if (byteDecoder != null) {
252
253 in.skipBytes(in.readableBytes());
254 }
255 onError(ctx, false, e);
256 }
257 }
258
259 @Override
260 public void channelActive(ChannelHandlerContext ctx) throws Exception {
261
262 sendPreface(ctx);
263
264 if (flushPreface) {
265
266
267
268 ctx.flush();
269 }
270 }
271
272 @Override
273 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
274 cleanup();
275 super.channelInactive(ctx);
276 }
277
278
279
280
281 @Override
282 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
283 cleanup();
284 }
285
286
287
288
289 private void cleanup() {
290 if (clientPrefaceString != null) {
291 clientPrefaceString.release();
292 clientPrefaceString = null;
293 }
294 }
295
296
297
298
299
300
301
302 private boolean readClientPrefaceString(ByteBuf in) throws Http2Exception {
303 if (clientPrefaceString == null) {
304 return true;
305 }
306
307 int prefaceRemaining = clientPrefaceString.readableBytes();
308 int bytesRead = min(in.readableBytes(), prefaceRemaining);
309
310
311 if (bytesRead == 0 || !ByteBufUtil.equals(in, in.readerIndex(),
312 clientPrefaceString, clientPrefaceString.readerIndex(),
313 bytesRead)) {
314 int maxSearch = 1024;
315 int http1Index =
316 ByteBufUtil.indexOf(HTTP_1_X_BUF, in.slice(in.readerIndex(), min(in.readableBytes(), maxSearch)));
317 if (http1Index != -1) {
318 String chunk = in.toString(in.readerIndex(), http1Index - in.readerIndex(), CharsetUtil.US_ASCII);
319 throw connectionError(PROTOCOL_ERROR, "Unexpected HTTP/1.x request: %s", chunk);
320 }
321 String receivedBytes = hexDump(in, in.readerIndex(),
322 min(in.readableBytes(), clientPrefaceString.readableBytes()));
323 throw connectionError(PROTOCOL_ERROR, "HTTP/2 client preface string missing or corrupt. " +
324 "Hex dump for received bytes: %s", receivedBytes);
325 }
326 in.skipBytes(bytesRead);
327 clientPrefaceString.skipBytes(bytesRead);
328
329 if (!clientPrefaceString.isReadable()) {
330
331 clientPrefaceString.release();
332 clientPrefaceString = null;
333 return true;
334 }
335 return false;
336 }
337
338
339
340
341
342
343
344
345
346 private boolean verifyFirstFrameIsSettings(ByteBuf in) throws Http2Exception {
347 if (in.readableBytes() < 5) {
348
349 return false;
350 }
351
352 short frameType = in.getUnsignedByte(in.readerIndex() + 3);
353 if (frameType != SETTINGS) {
354 throw connectionError(PROTOCOL_ERROR, "First received frame was not SETTINGS. " +
355 "Hex dump for first 5 bytes: %s",
356 hexDump(in, in.readerIndex(), 5));
357 }
358 short flags = in.getUnsignedByte(in.readerIndex() + 4);
359 if ((flags & Http2Flags.ACK) != 0) {
360 throw connectionError(PROTOCOL_ERROR, "First received frame was SETTINGS frame but had ACK flag set. " +
361 "Hex dump for first 5 bytes: %s",
362 hexDump(in, in.readerIndex(), 5));
363 }
364 return true;
365 }
366
367
368
369
370 private void sendPreface(ChannelHandlerContext ctx) throws Exception {
371 if (prefaceSent || !ctx.channel().isActive()) {
372 return;
373 }
374
375 prefaceSent = true;
376
377 final boolean isClient = !connection().isServer();
378 if (isClient) {
379
380 ctx.write(connectionPrefaceBuf()).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
381 }
382
383
384 encoder.writeSettings(ctx, initialSettings, ctx.newPromise()).addListener(
385 ChannelFutureListener.CLOSE_ON_FAILURE);
386
387 if (isClient) {
388
389
390
391 userEventTriggered(ctx, Http2ConnectionPrefaceAndSettingsFrameWrittenEvent.INSTANCE);
392 }
393 }
394 }
395
396 private final class FrameDecoder extends BaseDecoder {
397 @Override
398 public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
399 try {
400 decoder.decodeFrame(ctx, in, out);
401 } catch (Throwable e) {
402 onError(ctx, false, e);
403 }
404 }
405 }
406
407 @Override
408 public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
409
410 encoder.lifecycleManager(this);
411 decoder.lifecycleManager(this);
412 encoder.flowController().channelHandlerContext(ctx);
413 decoder.flowController().channelHandlerContext(ctx);
414 byteDecoder = new PrefaceDecoder(ctx);
415 }
416
417 @Override
418 protected void handlerRemoved0(ChannelHandlerContext ctx) throws Exception {
419 if (byteDecoder != null) {
420 byteDecoder.handlerRemoved(ctx);
421 byteDecoder = null;
422 }
423 }
424
425 @Override
426 public void channelActive(ChannelHandlerContext ctx) throws Exception {
427 if (byteDecoder == null) {
428 byteDecoder = new PrefaceDecoder(ctx);
429 }
430 byteDecoder.channelActive(ctx);
431 super.channelActive(ctx);
432 }
433
434 @Override
435 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
436
437 super.channelInactive(ctx);
438 if (byteDecoder != null) {
439 byteDecoder.channelInactive(ctx);
440 byteDecoder = null;
441 }
442 }
443
444 @Override
445 public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
446
447
448 try {
449 if (ctx.channel().isWritable()) {
450 flush(ctx);
451 }
452 encoder.flowController().channelWritabilityChanged();
453 } finally {
454 super.channelWritabilityChanged(ctx);
455 }
456 }
457
458 @Override
459 protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
460 byteDecoder.decode(ctx, in, out);
461 }
462
463 @Override
464 public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
465 ctx.bind(localAddress, promise);
466 }
467
468 @Override
469 public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress,
470 ChannelPromise promise) throws Exception {
471 ctx.connect(remoteAddress, localAddress, promise);
472 }
473
474 @Override
475 public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
476 ctx.disconnect(promise);
477 }
478
479 @Override
480 public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
481 if (decoupleCloseAndGoAway) {
482 ctx.close(promise);
483 return;
484 }
485 promise = promise.unvoid();
486
487 if (!ctx.channel().isActive() || !prefaceSent()) {
488 ctx.close(promise);
489 return;
490 }
491
492
493
494
495
496
497 ChannelFuture f = connection().goAwaySent() ? ctx.write(EMPTY_BUFFER) : goAway(ctx, null, ctx.newPromise());
498 ctx.flush();
499 doGracefulShutdown(ctx, f, promise);
500 }
501
502 private ChannelFutureListener newClosingChannelFutureListener(
503 ChannelHandlerContext ctx, ChannelPromise promise) {
504 long gracefulShutdownTimeoutMillis = this.gracefulShutdownTimeoutMillis;
505 return gracefulShutdownTimeoutMillis < 0 ?
506 new ClosingChannelFutureListener(ctx, promise) :
507 new ClosingChannelFutureListener(ctx, promise, gracefulShutdownTimeoutMillis, MILLISECONDS);
508 }
509
510 private void doGracefulShutdown(ChannelHandlerContext ctx, ChannelFuture future, final ChannelPromise promise) {
511 final ChannelFutureListener listener = newClosingChannelFutureListener(ctx, promise);
512 if (isGracefulShutdownComplete()) {
513
514
515 future.addListener(listener);
516 } else {
517
518
519
520
521 if (closeListener == null) {
522 closeListener = listener;
523 } else if (promise != null) {
524 final ChannelFutureListener oldCloseListener = closeListener;
525 closeListener = new ChannelFutureListener() {
526 @Override
527 public void operationComplete(ChannelFuture future) throws Exception {
528 try {
529 oldCloseListener.operationComplete(future);
530 } finally {
531 listener.operationComplete(future);
532 }
533 }
534 };
535 }
536 }
537 }
538
539 @Override
540 public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
541 ctx.deregister(promise);
542 }
543
544 @Override
545 public void read(ChannelHandlerContext ctx) throws Exception {
546 ctx.read();
547 }
548
549 @Override
550 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
551 ctx.write(msg, promise);
552 }
553
554 @Override
555 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
556
557
558 try {
559
560 channelReadComplete0(ctx);
561 } finally {
562 flush(ctx);
563 }
564 }
565
566 final void channelReadComplete0(ChannelHandlerContext ctx) {
567
568 discardSomeReadBytes();
569
570
571
572
573 if (!ctx.channel().config().isAutoRead()) {
574 ctx.read();
575 }
576
577 ctx.fireChannelReadComplete();
578 }
579
580
581
582
583 @Override
584 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
585 if (getEmbeddedHttp2Exception(cause) != null) {
586
587 onError(ctx, false, cause);
588 } else {
589 super.exceptionCaught(ctx, cause);
590 }
591 }
592
593
594
595
596
597
598
599
600 @Override
601 public void closeStreamLocal(Http2Stream stream, ChannelFuture future) {
602 switch (stream.state()) {
603 case HALF_CLOSED_LOCAL:
604 case OPEN:
605 stream.closeLocalSide();
606 break;
607 default:
608 closeStream(stream, future);
609 break;
610 }
611 }
612
613
614
615
616
617
618
619
620 @Override
621 public void closeStreamRemote(Http2Stream stream, ChannelFuture future) {
622 switch (stream.state()) {
623 case HALF_CLOSED_REMOTE:
624 case OPEN:
625 stream.closeRemoteSide();
626 break;
627 default:
628 closeStream(stream, future);
629 break;
630 }
631 }
632
633 @Override
634 public void closeStream(final Http2Stream stream, ChannelFuture future) {
635 if (future.isDone()) {
636 doCloseStream(stream, future);
637 } else {
638 future.addListener(new ChannelFutureListener() {
639 @Override
640 public void operationComplete(ChannelFuture future) {
641 doCloseStream(stream, future);
642 }
643 });
644 }
645 }
646
647
648
649
650 @Override
651 public void onError(ChannelHandlerContext ctx, boolean outbound, Throwable cause) {
652 Http2Exception embedded = getEmbeddedHttp2Exception(cause);
653 if (isStreamError(embedded)) {
654 onStreamError(ctx, outbound, cause, (StreamException) embedded);
655 } else if (embedded instanceof CompositeStreamException) {
656 CompositeStreamException compositException = (CompositeStreamException) embedded;
657 for (StreamException streamException : compositException) {
658 onStreamError(ctx, outbound, cause, streamException);
659 }
660 } else {
661 onConnectionError(ctx, outbound, cause, embedded);
662 }
663 ctx.flush();
664 }
665
666
667
668
669
670
671 protected boolean isGracefulShutdownComplete() {
672 return connection().numActiveStreams() == 0;
673 }
674
675
676
677
678
679
680
681
682
683
684
685 protected void onConnectionError(ChannelHandlerContext ctx, boolean outbound,
686 Throwable cause, Http2Exception http2Ex) {
687 if (http2Ex == null) {
688 http2Ex = new Http2Exception(INTERNAL_ERROR, cause.getMessage(), cause);
689 }
690
691 ChannelPromise promise = ctx.newPromise();
692 ChannelFuture future = goAway(ctx, http2Ex, ctx.newPromise());
693 if (http2Ex.shutdownHint() == Http2Exception.ShutdownHint.GRACEFUL_SHUTDOWN) {
694 doGracefulShutdown(ctx, future, promise);
695 } else {
696 future.addListener(newClosingChannelFutureListener(ctx, promise));
697 }
698 }
699
700
701
702
703
704
705
706
707
708
709 protected void onStreamError(ChannelHandlerContext ctx, boolean outbound,
710 @SuppressWarnings("unused") Throwable cause, StreamException http2Ex) {
711 final int streamId = http2Ex.streamId();
712 Http2Stream stream = connection().stream(streamId);
713
714
715 if (http2Ex instanceof Http2Exception.HeaderListSizeException &&
716 ((Http2Exception.HeaderListSizeException) http2Ex).duringDecode() &&
717 connection().isServer()) {
718
719
720
721
722
723
724
725 if (stream == null) {
726 try {
727 stream = encoder.connection().remote().createStream(streamId, true);
728 } catch (Http2Exception e) {
729 encoder().writeRstStream(ctx, streamId, http2Ex.error().code(), ctx.newPromise());
730 return;
731 }
732 }
733
734
735 if (stream != null && !stream.isHeadersSent()) {
736 try {
737 handleServerHeaderDecodeSizeError(ctx, stream);
738 } catch (Throwable cause2) {
739 onError(ctx, outbound, connectionError(INTERNAL_ERROR, cause2, "Error DecodeSizeError"));
740 }
741 }
742 }
743
744 if (stream == null) {
745 if (!outbound || connection().local().mayHaveCreatedStream(streamId)) {
746 encoder().writeRstStream(ctx, streamId, http2Ex.error().code(), ctx.newPromise());
747 }
748 } else {
749 encoder().writeRstStream(ctx, streamId, http2Ex.error().code(), ctx.newPromise());
750 }
751 }
752
753
754
755
756
757
758
759
760 protected void handleServerHeaderDecodeSizeError(ChannelHandlerContext ctx, Http2Stream stream) {
761 encoder().writeHeaders(ctx, stream.id(), HEADERS_TOO_LARGE_HEADERS, 0, true, ctx.newPromise());
762 }
763
764 protected Http2FrameWriter frameWriter() {
765 return encoder().frameWriter();
766 }
767
768
769
770
771
772
773 private ChannelFuture resetUnknownStream(final ChannelHandlerContext ctx, int streamId, long errorCode,
774 ChannelPromise promise) {
775 ChannelFuture future = frameWriter().writeRstStream(ctx, streamId, errorCode, promise);
776 if (future.isDone()) {
777 closeConnectionOnError(ctx, future);
778 } else {
779 future.addListener(new ChannelFutureListener() {
780 @Override
781 public void operationComplete(ChannelFuture future) throws Exception {
782 closeConnectionOnError(ctx, future);
783 }
784 });
785 }
786 return future;
787 }
788
789 @Override
790 public ChannelFuture resetStream(final ChannelHandlerContext ctx, int streamId, long errorCode,
791 ChannelPromise promise) {
792 final Http2Stream stream = connection().stream(streamId);
793 if (stream == null) {
794 return resetUnknownStream(ctx, streamId, errorCode, promise.unvoid());
795 }
796
797 return resetStream(ctx, stream, errorCode, promise);
798 }
799
800 private ChannelFuture resetStream(final ChannelHandlerContext ctx, final Http2Stream stream,
801 long errorCode, ChannelPromise promise) {
802 promise = promise.unvoid();
803 if (stream.isResetSent()) {
804
805 return promise.setSuccess();
806 }
807
808
809
810
811
812 stream.resetSent();
813
814 final ChannelFuture future;
815
816
817 if (stream.state() == IDLE ||
818 connection().local().created(stream) && !stream.isHeadersSent() && !stream.isPushPromiseSent()) {
819 future = promise.setSuccess();
820 } else {
821 future = frameWriter().writeRstStream(ctx, stream.id(), errorCode, promise);
822 }
823 if (future.isDone()) {
824 processRstStreamWriteResult(ctx, stream, future);
825 } else {
826 future.addListener(new ChannelFutureListener() {
827 @Override
828 public void operationComplete(ChannelFuture future) throws Exception {
829 processRstStreamWriteResult(ctx, stream, future);
830 }
831 });
832 }
833
834 return future;
835 }
836
837 @Override
838 public ChannelFuture goAway(final ChannelHandlerContext ctx, final int lastStreamId, final long errorCode,
839 final ByteBuf debugData, ChannelPromise promise) {
840 promise = promise.unvoid();
841 final Http2Connection connection = connection();
842 try {
843 if (!connection.goAwaySent(lastStreamId, errorCode, debugData)) {
844 debugData.release();
845 promise.trySuccess();
846 return promise;
847 }
848 } catch (Throwable cause) {
849 debugData.release();
850 promise.tryFailure(cause);
851 return promise;
852 }
853
854
855
856 debugData.retain();
857 ChannelFuture future = frameWriter().writeGoAway(ctx, lastStreamId, errorCode, debugData, promise);
858
859 if (future.isDone()) {
860 processGoAwayWriteResult(ctx, lastStreamId, errorCode, debugData, future);
861 } else {
862 future.addListener(new ChannelFutureListener() {
863 @Override
864 public void operationComplete(ChannelFuture future) throws Exception {
865 processGoAwayWriteResult(ctx, lastStreamId, errorCode, debugData, future);
866 }
867 });
868 }
869
870 return future;
871 }
872
873
874
875
876
877 private void checkCloseConnection(ChannelFuture future) {
878
879
880 if (closeListener != null && isGracefulShutdownComplete()) {
881 ChannelFutureListener closeListener = this.closeListener;
882
883
884 this.closeListener = null;
885 try {
886 closeListener.operationComplete(future);
887 } catch (Exception e) {
888 throw new IllegalStateException("Close listener threw an unexpected exception", e);
889 }
890 }
891 }
892
893
894
895
896
897 private ChannelFuture goAway(ChannelHandlerContext ctx, Http2Exception cause, ChannelPromise promise) {
898 long errorCode = cause != null ? cause.error().code() : NO_ERROR.code();
899 int lastKnownStream;
900 if (cause != null && cause.shutdownHint() == Http2Exception.ShutdownHint.HARD_SHUTDOWN) {
901
902
903
904
905 lastKnownStream = Integer.MAX_VALUE;
906 } else {
907 lastKnownStream = connection().remote().lastStreamCreated();
908 }
909 return goAway(ctx, lastKnownStream, errorCode, Http2CodecUtil.toByteBuf(ctx, cause), promise);
910 }
911
912 private void processRstStreamWriteResult(ChannelHandlerContext ctx, Http2Stream stream, ChannelFuture future) {
913 if (future.isSuccess()) {
914 closeStream(stream, future);
915 } else {
916
917 onConnectionError(ctx, true, future.cause(), null);
918 }
919 }
920
921 private void closeConnectionOnError(ChannelHandlerContext ctx, ChannelFuture future) {
922 if (!future.isSuccess()) {
923 onConnectionError(ctx, true, future.cause(), null);
924 }
925 }
926
927 private void doCloseStream(final Http2Stream stream, ChannelFuture future) {
928 stream.close();
929 checkCloseConnection(future);
930 }
931
932
933
934
935 private static ByteBuf clientPrefaceString(Http2Connection connection) {
936 return connection.isServer() ? connectionPrefaceBuf() : null;
937 }
938
939 private static void processGoAwayWriteResult(final ChannelHandlerContext ctx, final int lastStreamId,
940 final long errorCode, final ByteBuf debugData, ChannelFuture future) {
941 try {
942 if (future.isSuccess()) {
943 if (errorCode != NO_ERROR.code()) {
944 if (logger.isDebugEnabled()) {
945 logger.debug("{} Sent GOAWAY: lastStreamId '{}', errorCode '{}', " +
946 "debugData '{}'. Forcing shutdown of the connection.",
947 ctx.channel(), lastStreamId, errorCode, debugData.toString(UTF_8), future.cause());
948 }
949 ctx.close();
950 }
951 } else {
952 if (logger.isDebugEnabled()) {
953 logger.debug("{} Sending GOAWAY failed: lastStreamId '{}', errorCode '{}', " +
954 "debugData '{}'. Forcing shutdown of the connection.",
955 ctx.channel(), lastStreamId, errorCode, debugData.toString(UTF_8), future.cause());
956 }
957 ctx.close();
958 }
959 } finally {
960
961 debugData.release();
962 }
963 }
964
965
966
967
968 private static final class ClosingChannelFutureListener implements ChannelFutureListener {
969 private final ChannelHandlerContext ctx;
970 private final ChannelPromise promise;
971 private final Future<?> timeoutTask;
972 private boolean closed;
973
974 ClosingChannelFutureListener(ChannelHandlerContext ctx, ChannelPromise promise) {
975 this.ctx = ctx;
976 this.promise = promise;
977 timeoutTask = null;
978 }
979
980 ClosingChannelFutureListener(final ChannelHandlerContext ctx, final ChannelPromise promise,
981 long timeout, TimeUnit unit) {
982 this.ctx = ctx;
983 this.promise = promise;
984 timeoutTask = ctx.executor().schedule(new Runnable() {
985 @Override
986 public void run() {
987 doClose();
988 }
989 }, timeout, unit);
990 }
991
992 @Override
993 public void operationComplete(ChannelFuture sentGoAwayFuture) {
994 if (timeoutTask != null) {
995 timeoutTask.cancel(false);
996 }
997 doClose();
998 }
999
1000 private void doClose() {
1001
1002
1003 if (closed) {
1004
1005 assert timeoutTask != null;
1006 return;
1007 }
1008 closed = true;
1009 if (promise == null) {
1010 ctx.close();
1011 } else {
1012 ctx.close(promise);
1013 }
1014 }
1015 }
1016 }