1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package io.netty.handler.codec.http2;
16
17 import io.netty.buffer.ByteBuf;
18 import io.netty.buffer.ByteBufUtil;
19 import io.netty.buffer.Unpooled;
20 import io.netty.channel.ChannelFuture;
21 import io.netty.channel.ChannelFutureListener;
22 import io.netty.channel.ChannelHandlerContext;
23 import io.netty.channel.ChannelOutboundHandler;
24 import io.netty.channel.ChannelPromise;
25 import io.netty.handler.codec.ByteToMessageDecoder;
26 import io.netty.handler.codec.http.HttpResponseStatus;
27 import io.netty.handler.codec.http2.Http2Exception.CompositeStreamException;
28 import io.netty.handler.codec.http2.Http2Exception.StreamException;
29 import io.netty.util.CharsetUtil;
30 import io.netty.util.concurrent.Future;
31 import io.netty.util.internal.logging.InternalLogger;
32 import io.netty.util.internal.logging.InternalLoggerFactory;
33
34 import java.net.SocketAddress;
35 import java.util.List;
36 import java.util.concurrent.TimeUnit;
37
38 import static io.netty.buffer.ByteBufUtil.hexDump;
39 import static io.netty.buffer.Unpooled.EMPTY_BUFFER;
40 import static io.netty.handler.codec.http2.Http2CodecUtil.HTTP_UPGRADE_STREAM_ID;
41 import static io.netty.handler.codec.http2.Http2CodecUtil.connectionPrefaceBuf;
42 import static io.netty.handler.codec.http2.Http2CodecUtil.getEmbeddedHttp2Exception;
43 import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
44 import static io.netty.handler.codec.http2.Http2Error.NO_ERROR;
45 import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
46 import static io.netty.handler.codec.http2.Http2Exception.connectionError;
47 import static io.netty.handler.codec.http2.Http2Exception.isStreamError;
48 import static io.netty.handler.codec.http2.Http2FrameTypes.SETTINGS;
49 import static io.netty.handler.codec.http2.Http2Stream.State.IDLE;
50 import static io.netty.util.CharsetUtil.UTF_8;
51 import static io.netty.util.internal.ObjectUtil.checkNotNull;
52 import static java.lang.Math.min;
53 import static java.util.concurrent.TimeUnit.MILLISECONDS;
54
55
56
57
58
59
60
61
62
63
64 public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http2LifecycleManager,
65 ChannelOutboundHandler {
66
67 private static final InternalLogger logger = InternalLoggerFactory.getInstance(Http2ConnectionHandler.class);
68
69 private static final Http2Headers HEADERS_TOO_LARGE_HEADERS = ReadOnlyHttp2Headers.serverHeaders(false,
70 HttpResponseStatus.REQUEST_HEADER_FIELDS_TOO_LARGE.codeAsText());
71 private static final ByteBuf HTTP_1_X_BUF = Unpooled.unreleasableBuffer(
72 Unpooled.wrappedBuffer(new byte[] {'H', 'T', 'T', 'P', '/', '1', '.'})).asReadOnly();
73
74 private final Http2ConnectionDecoder decoder;
75 private final Http2ConnectionEncoder encoder;
76 private final Http2Settings initialSettings;
77 private final boolean decoupleCloseAndGoAway;
78 private final boolean flushPreface;
79 private ChannelFutureListener closeListener;
80 private BaseDecoder byteDecoder;
81 private long gracefulShutdownTimeoutMillis;
82
83 protected Http2ConnectionHandler(Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder,
84 Http2Settings initialSettings) {
85 this(decoder, encoder, initialSettings, false);
86 }
87
88 protected Http2ConnectionHandler(Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder,
89 Http2Settings initialSettings, boolean decoupleCloseAndGoAway) {
90 this(decoder, encoder, initialSettings, decoupleCloseAndGoAway, true);
91 }
92
93 protected Http2ConnectionHandler(Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder,
94 Http2Settings initialSettings, boolean decoupleCloseAndGoAway,
95 boolean flushPreface) {
96 this.initialSettings = checkNotNull(initialSettings, "initialSettings");
97 this.decoder = checkNotNull(decoder, "decoder");
98 this.encoder = checkNotNull(encoder, "encoder");
99 this.decoupleCloseAndGoAway = decoupleCloseAndGoAway;
100 this.flushPreface = flushPreface;
101 if (encoder.connection() != decoder.connection()) {
102 throw new IllegalArgumentException("Encoder and Decoder do not share the same connection object");
103 }
104 }
105
106
107
108
109
110
111 public long gracefulShutdownTimeoutMillis() {
112 return gracefulShutdownTimeoutMillis;
113 }
114
115
116
117
118
119
120
121 public void gracefulShutdownTimeoutMillis(long gracefulShutdownTimeoutMillis) {
122 if (gracefulShutdownTimeoutMillis < -1) {
123 throw new IllegalArgumentException("gracefulShutdownTimeoutMillis: " + gracefulShutdownTimeoutMillis +
124 " (expected: -1 for indefinite or >= 0)");
125 }
126 this.gracefulShutdownTimeoutMillis = gracefulShutdownTimeoutMillis;
127 }
128
129 public Http2Connection connection() {
130 return encoder.connection();
131 }
132
133 public Http2ConnectionDecoder decoder() {
134 return decoder;
135 }
136
137 public Http2ConnectionEncoder encoder() {
138 return encoder;
139 }
140
141 private boolean prefaceSent() {
142 return byteDecoder != null && byteDecoder.prefaceSent();
143 }
144
145
146
147
148
149 public void onHttpClientUpgrade() throws Http2Exception {
150 if (connection().isServer()) {
151 throw connectionError(PROTOCOL_ERROR, "Client-side HTTP upgrade requested for a server");
152 }
153 if (!prefaceSent()) {
154
155
156 throw connectionError(INTERNAL_ERROR, "HTTP upgrade must occur after preface was sent");
157 }
158 if (decoder.prefaceReceived()) {
159 throw connectionError(PROTOCOL_ERROR, "HTTP upgrade must occur before HTTP/2 preface is received");
160 }
161
162
163 connection().local().createStream(HTTP_UPGRADE_STREAM_ID, true);
164 }
165
166
167
168
169
170 public void onHttpServerUpgrade(Http2Settings settings) throws Http2Exception {
171 if (!connection().isServer()) {
172 throw connectionError(PROTOCOL_ERROR, "Server-side HTTP upgrade requested for a client");
173 }
174 if (!prefaceSent()) {
175
176
177 throw connectionError(INTERNAL_ERROR, "HTTP upgrade must occur after preface was sent");
178 }
179 if (decoder.prefaceReceived()) {
180 throw connectionError(PROTOCOL_ERROR, "HTTP upgrade must occur before HTTP/2 preface is received");
181 }
182
183
184 encoder.remoteSettings(settings);
185
186
187 connection().remote().createStream(HTTP_UPGRADE_STREAM_ID, true);
188 }
189
190 @Override
191 public void flush(ChannelHandlerContext ctx) {
192 try {
193
194 encoder.flowController().writePendingBytes();
195 ctx.flush();
196 } catch (Http2Exception e) {
197 onError(ctx, true, e);
198 } catch (Throwable cause) {
199 onError(ctx, true, connectionError(INTERNAL_ERROR, cause, "Error flushing"));
200 }
201 }
202
203 private abstract class BaseDecoder {
204 public abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;
205 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { }
206 public void channelActive(ChannelHandlerContext ctx) throws Exception { }
207
208 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
209
210 encoder().close();
211 decoder().close();
212
213
214
215 connection().close(ctx.voidPromise());
216 }
217
218
219
220
221 public boolean prefaceSent() {
222 return true;
223 }
224
225
226
227
228
229
230
231 public void sendPrefaceIfNeeded(ChannelHandlerContext ctx) throws Exception {
232
233 }
234 }
235
236 private final class PrefaceDecoder extends BaseDecoder {
237 private ByteBuf clientPrefaceString;
238 private boolean prefaceSent;
239
240 PrefaceDecoder(ChannelHandlerContext ctx) throws Exception {
241 clientPrefaceString = clientPrefaceString(encoder.connection());
242
243
244 sendPrefaceIfNeeded(ctx);
245 }
246
247 @Override
248 public boolean prefaceSent() {
249 return prefaceSent;
250 }
251
252 @Override
253 public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
254 try {
255 if (ctx.channel().isActive() && readClientPrefaceString(in) && verifyFirstFrameIsSettings(in)) {
256
257 byteDecoder = new FrameDecoder();
258 byteDecoder.decode(ctx, in, out);
259 }
260 } catch (Throwable e) {
261 if (byteDecoder != null) {
262
263 in.skipBytes(in.readableBytes());
264 }
265 onError(ctx, false, e);
266 }
267 }
268
269 @Override
270 public void channelActive(ChannelHandlerContext ctx) throws Exception {
271
272 sendPrefaceIfNeeded(ctx);
273 }
274
275 @Override
276 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
277 cleanup();
278 super.channelInactive(ctx);
279 }
280
281
282
283
284 @Override
285 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
286 cleanup();
287 }
288
289
290
291
292 private void cleanup() {
293 if (clientPrefaceString != null) {
294 clientPrefaceString.release();
295 clientPrefaceString = null;
296 }
297 }
298
299
300
301
302
303
304
305 private boolean readClientPrefaceString(ByteBuf in) throws Http2Exception {
306 if (clientPrefaceString == null) {
307 return true;
308 }
309
310 int prefaceRemaining = clientPrefaceString.readableBytes();
311 int bytesRead = min(in.readableBytes(), prefaceRemaining);
312
313
314 if (bytesRead == 0 || !ByteBufUtil.equals(in, in.readerIndex(),
315 clientPrefaceString, clientPrefaceString.readerIndex(),
316 bytesRead)) {
317 int maxSearch = 1024;
318 int http1Index =
319 ByteBufUtil.indexOf(HTTP_1_X_BUF, in.slice(in.readerIndex(), min(in.readableBytes(), maxSearch)));
320 if (http1Index != -1) {
321 String chunk = in.toString(in.readerIndex(), http1Index - in.readerIndex(), CharsetUtil.US_ASCII);
322 throw connectionError(PROTOCOL_ERROR, "Unexpected HTTP/1.x request: %s", chunk);
323 }
324 String receivedBytes = hexDump(in, in.readerIndex(),
325 min(in.readableBytes(), clientPrefaceString.readableBytes()));
326 throw connectionError(PROTOCOL_ERROR, "HTTP/2 client preface string missing or corrupt. " +
327 "Hex dump for received bytes: %s", receivedBytes);
328 }
329 in.skipBytes(bytesRead);
330 clientPrefaceString.skipBytes(bytesRead);
331
332 if (!clientPrefaceString.isReadable()) {
333
334 clientPrefaceString.release();
335 clientPrefaceString = null;
336 return true;
337 }
338 return false;
339 }
340
341
342
343
344
345
346
347
348
349 private boolean verifyFirstFrameIsSettings(ByteBuf in) throws Http2Exception {
350 if (in.readableBytes() < 5) {
351
352 return false;
353 }
354
355 short frameType = in.getUnsignedByte(in.readerIndex() + 3);
356 if (frameType != SETTINGS) {
357 throw connectionError(PROTOCOL_ERROR, "First received frame was not SETTINGS. " +
358 "Hex dump for first 5 bytes: %s",
359 hexDump(in, in.readerIndex(), 5));
360 }
361 short flags = in.getUnsignedByte(in.readerIndex() + 4);
362 if ((flags & Http2Flags.ACK) != 0) {
363 throw connectionError(PROTOCOL_ERROR, "First received frame was SETTINGS frame but had ACK flag set. " +
364 "Hex dump for first 5 bytes: %s",
365 hexDump(in, in.readerIndex(), 5));
366 }
367 return true;
368 }
369
370
371
372
373 @Override
374 public void sendPrefaceIfNeeded(ChannelHandlerContext ctx) throws Exception {
375 if (prefaceSent || !ctx.channel().isActive()) {
376 return;
377 }
378
379 prefaceSent = true;
380
381 final boolean isClient = !connection().isServer();
382 if (isClient) {
383
384 ctx.write(connectionPrefaceBuf()).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
385 }
386
387
388 encoder.writeSettings(ctx, initialSettings, ctx.newPromise()).addListener(
389 ChannelFutureListener.CLOSE_ON_FAILURE);
390
391 try {
392 if (isClient) {
393
394
395
396 userEventTriggered(ctx, Http2ConnectionPrefaceAndSettingsFrameWrittenEvent.INSTANCE);
397 }
398 } finally {
399 if (flushPreface) {
400
401
402
403 ctx.flush();
404 }
405 }
406 }
407 }
408
409 private final class FrameDecoder extends BaseDecoder {
410 @Override
411 public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
412 try {
413 decoder.decodeFrame(ctx, in, out);
414 } catch (Throwable e) {
415 onError(ctx, false, e);
416 }
417 }
418 }
419
420 @Override
421 public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
422
423 encoder.lifecycleManager(this);
424 decoder.lifecycleManager(this);
425 encoder.flowController().channelHandlerContext(ctx);
426 decoder.flowController().channelHandlerContext(ctx);
427 byteDecoder = new PrefaceDecoder(ctx);
428 }
429
430 @Override
431 protected void handlerRemoved0(ChannelHandlerContext ctx) throws Exception {
432 if (byteDecoder != null) {
433 byteDecoder.handlerRemoved(ctx);
434 byteDecoder = null;
435 }
436 }
437
438 @Override
439 public void channelActive(ChannelHandlerContext ctx) throws Exception {
440 if (byteDecoder == null) {
441 byteDecoder = new PrefaceDecoder(ctx);
442 }
443 byteDecoder.channelActive(ctx);
444 super.channelActive(ctx);
445 }
446
447 @Override
448 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
449
450 super.channelInactive(ctx);
451 if (byteDecoder != null) {
452 byteDecoder.channelInactive(ctx);
453 byteDecoder = null;
454 }
455 }
456
457 @Override
458 public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
459
460
461 try {
462 if (ctx.channel().isWritable()) {
463 flush(ctx);
464 }
465 encoder.flowController().channelWritabilityChanged();
466 } finally {
467 super.channelWritabilityChanged(ctx);
468 }
469 }
470
471 @Override
472 protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
473 byteDecoder.decode(ctx, in, out);
474 }
475
476 @Override
477 public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
478
479
480
481 ctx.bind(localAddress, ctx.newPromise()).addListener(new PrefaceSendListener(ctx, promise));
482 }
483
484 @Override
485 public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress,
486 ChannelPromise promise) throws Exception {
487
488
489
490 ctx.connect(remoteAddress, localAddress, ctx.newPromise()).addListener(new PrefaceSendListener(ctx, promise));
491 }
492
493 @Override
494 public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
495 ctx.disconnect(promise);
496 }
497
498 @Override
499 public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
500 if (decoupleCloseAndGoAway) {
501 ctx.close(promise);
502 return;
503 }
504 promise = promise.unvoid();
505
506 if (!ctx.channel().isActive() || !prefaceSent()) {
507 ctx.close(promise);
508 return;
509 }
510
511
512
513
514
515
516 ChannelFuture f = connection().goAwaySent() ? ctx.write(EMPTY_BUFFER) : goAway(ctx, null, ctx.newPromise());
517 ctx.flush();
518 doGracefulShutdown(ctx, f, promise);
519 }
520
521 private ChannelFutureListener newClosingChannelFutureListener(
522 ChannelHandlerContext ctx, ChannelPromise promise) {
523 long gracefulShutdownTimeoutMillis = this.gracefulShutdownTimeoutMillis;
524 return gracefulShutdownTimeoutMillis < 0 ?
525 new ClosingChannelFutureListener(ctx, promise) :
526 new ClosingChannelFutureListener(ctx, promise, gracefulShutdownTimeoutMillis, MILLISECONDS);
527 }
528
529 private void doGracefulShutdown(ChannelHandlerContext ctx, ChannelFuture future, final ChannelPromise promise) {
530 final ChannelFutureListener listener = newClosingChannelFutureListener(ctx, promise);
531 if (isGracefulShutdownComplete()) {
532
533
534 future.addListener(listener);
535 } else {
536
537
538
539
540 if (closeListener == null) {
541 closeListener = listener;
542 } else if (promise != null) {
543 final ChannelFutureListener oldCloseListener = closeListener;
544 closeListener = new ChannelFutureListener() {
545 @Override
546 public void operationComplete(ChannelFuture future) throws Exception {
547 try {
548 oldCloseListener.operationComplete(future);
549 } finally {
550 listener.operationComplete(future);
551 }
552 }
553 };
554 }
555 }
556 }
557
558 @Override
559 public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
560 ctx.deregister(promise);
561 }
562
563 @Override
564 public void read(ChannelHandlerContext ctx) throws Exception {
565 ctx.read();
566 }
567
568 @Override
569 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
570 ctx.write(msg, promise);
571 }
572
573 @Override
574 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
575
576
577 try {
578
579 channelReadComplete0(ctx);
580 } finally {
581 flush(ctx);
582 }
583 }
584
585 final void channelReadComplete0(ChannelHandlerContext ctx) {
586
587 discardSomeReadBytes();
588
589
590
591
592 if (!ctx.channel().config().isAutoRead()) {
593 ctx.read();
594 }
595
596 ctx.fireChannelReadComplete();
597 }
598
599
600
601
602 @Override
603 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
604 if (getEmbeddedHttp2Exception(cause) != null) {
605
606 onError(ctx, false, cause);
607 } else {
608 super.exceptionCaught(ctx, cause);
609 }
610 }
611
612
613
614
615
616
617
618
619 @Override
620 public void closeStreamLocal(Http2Stream stream, ChannelFuture future) {
621 switch (stream.state()) {
622 case HALF_CLOSED_LOCAL:
623 case OPEN:
624 stream.closeLocalSide();
625 break;
626 default:
627 closeStream(stream, future);
628 break;
629 }
630 }
631
632
633
634
635
636
637
638
639 @Override
640 public void closeStreamRemote(Http2Stream stream, ChannelFuture future) {
641 switch (stream.state()) {
642 case HALF_CLOSED_REMOTE:
643 case OPEN:
644 stream.closeRemoteSide();
645 break;
646 default:
647 closeStream(stream, future);
648 break;
649 }
650 }
651
652 @Override
653 public void closeStream(final Http2Stream stream, ChannelFuture future) {
654 if (future.isDone()) {
655 doCloseStream(stream, future);
656 } else {
657 future.addListener(new ChannelFutureListener() {
658 @Override
659 public void operationComplete(ChannelFuture future) {
660 doCloseStream(stream, future);
661 }
662 });
663 }
664 }
665
666
667
668
669 @Override
670 public void onError(ChannelHandlerContext ctx, boolean outbound, Throwable cause) {
671 Http2Exception embedded = getEmbeddedHttp2Exception(cause);
672 if (isStreamError(embedded)) {
673 onStreamError(ctx, outbound, cause, (StreamException) embedded);
674 } else if (embedded instanceof CompositeStreamException) {
675 CompositeStreamException compositException = (CompositeStreamException) embedded;
676 for (StreamException streamException : compositException) {
677 onStreamError(ctx, outbound, cause, streamException);
678 }
679 } else {
680 onConnectionError(ctx, outbound, cause, embedded);
681 }
682 ctx.flush();
683 }
684
685
686
687
688
689
690 protected boolean isGracefulShutdownComplete() {
691 return connection().numActiveStreams() == 0;
692 }
693
694
695
696
697
698
699
700
701
702
703
704 protected void onConnectionError(ChannelHandlerContext ctx, boolean outbound,
705 Throwable cause, Http2Exception http2Ex) {
706 if (http2Ex == null) {
707 http2Ex = new Http2Exception(INTERNAL_ERROR, cause.getMessage(), cause);
708 }
709
710 ChannelPromise promise = ctx.newPromise();
711 ChannelFuture future = goAway(ctx, http2Ex, ctx.newPromise());
712 if (http2Ex.shutdownHint() == Http2Exception.ShutdownHint.GRACEFUL_SHUTDOWN) {
713 doGracefulShutdown(ctx, future, promise);
714 } else {
715 future.addListener(newClosingChannelFutureListener(ctx, promise));
716 }
717 }
718
719
720
721
722
723
724
725
726
727
728 protected void onStreamError(ChannelHandlerContext ctx, boolean outbound,
729 @SuppressWarnings("unused") Throwable cause, StreamException http2Ex) {
730 final int streamId = http2Ex.streamId();
731 Http2Stream stream = connection().stream(streamId);
732
733
734 if (http2Ex instanceof Http2Exception.HeaderListSizeException &&
735 ((Http2Exception.HeaderListSizeException) http2Ex).duringDecode() &&
736 connection().isServer()) {
737
738
739
740
741
742
743
744 if (stream == null) {
745 try {
746 stream = encoder.connection().remote().createStream(streamId, true);
747 } catch (Http2Exception e) {
748 encoder().writeRstStream(ctx, streamId, http2Ex.error().code(), ctx.newPromise());
749 return;
750 }
751 }
752
753
754 if (stream != null && !stream.isHeadersSent()) {
755 try {
756 handleServerHeaderDecodeSizeError(ctx, stream);
757 } catch (Throwable cause2) {
758 onError(ctx, outbound, connectionError(INTERNAL_ERROR, cause2, "Error DecodeSizeError"));
759 }
760 }
761 }
762
763 if (stream == null) {
764 if (!outbound || connection().local().mayHaveCreatedStream(streamId)) {
765 encoder().writeRstStream(ctx, streamId, http2Ex.error().code(), ctx.newPromise());
766 }
767 } else {
768 encoder().writeRstStream(ctx, streamId, http2Ex.error().code(), ctx.newPromise());
769 }
770 }
771
772
773
774
775
776
777
778
779 protected void handleServerHeaderDecodeSizeError(ChannelHandlerContext ctx, Http2Stream stream) {
780 encoder().writeHeaders(ctx, stream.id(), HEADERS_TOO_LARGE_HEADERS, 0, true, ctx.newPromise());
781 }
782
783 protected Http2FrameWriter frameWriter() {
784 return encoder().frameWriter();
785 }
786
787
788
789
790
791
792 private ChannelFuture resetUnknownStream(final ChannelHandlerContext ctx, int streamId, long errorCode,
793 ChannelPromise promise) {
794 ChannelFuture future = frameWriter().writeRstStream(ctx, streamId, errorCode, promise);
795 if (future.isDone()) {
796 closeConnectionOnError(ctx, future);
797 } else {
798 future.addListener(new ChannelFutureListener() {
799 @Override
800 public void operationComplete(ChannelFuture future) throws Exception {
801 closeConnectionOnError(ctx, future);
802 }
803 });
804 }
805 return future;
806 }
807
808 @Override
809 public ChannelFuture resetStream(final ChannelHandlerContext ctx, int streamId, long errorCode,
810 ChannelPromise promise) {
811 final Http2Stream stream = connection().stream(streamId);
812 if (stream == null) {
813 return resetUnknownStream(ctx, streamId, errorCode, promise.unvoid());
814 }
815
816 return resetStream(ctx, stream, errorCode, promise);
817 }
818
819 private ChannelFuture resetStream(final ChannelHandlerContext ctx, final Http2Stream stream,
820 long errorCode, ChannelPromise promise) {
821 promise = promise.unvoid();
822 if (stream.isResetSent()) {
823
824 return promise.setSuccess();
825 }
826
827
828
829
830
831 stream.resetSent();
832
833 final ChannelFuture future;
834
835
836 if (stream.state() == IDLE ||
837 connection().local().created(stream) && !stream.isHeadersSent() && !stream.isPushPromiseSent()) {
838 future = promise.setSuccess();
839 } else {
840 future = frameWriter().writeRstStream(ctx, stream.id(), errorCode, promise);
841 }
842 if (future.isDone()) {
843 processRstStreamWriteResult(ctx, stream, future);
844 } else {
845 future.addListener(new ChannelFutureListener() {
846 @Override
847 public void operationComplete(ChannelFuture future) throws Exception {
848 processRstStreamWriteResult(ctx, stream, future);
849 }
850 });
851 }
852
853 return future;
854 }
855
856 @Override
857 public ChannelFuture goAway(final ChannelHandlerContext ctx, final int lastStreamId, final long errorCode,
858 final ByteBuf debugData, ChannelPromise promise) {
859 promise = promise.unvoid();
860 final Http2Connection connection = connection();
861 try {
862 if (!connection.goAwaySent(lastStreamId, errorCode, debugData)) {
863 debugData.release();
864 promise.trySuccess();
865 return promise;
866 }
867 } catch (Throwable cause) {
868 debugData.release();
869 promise.tryFailure(cause);
870 return promise;
871 }
872
873
874
875 debugData.retain();
876 ChannelFuture future = frameWriter().writeGoAway(ctx, lastStreamId, errorCode, debugData, promise);
877
878 if (future.isDone()) {
879 processGoAwayWriteResult(ctx, lastStreamId, errorCode, debugData, future);
880 } else {
881 future.addListener(new ChannelFutureListener() {
882 @Override
883 public void operationComplete(ChannelFuture future) throws Exception {
884 processGoAwayWriteResult(ctx, lastStreamId, errorCode, debugData, future);
885 }
886 });
887 }
888
889 return future;
890 }
891
892
893
894
895
896 private void checkCloseConnection(ChannelFuture future) {
897
898
899 if (closeListener != null && isGracefulShutdownComplete()) {
900 ChannelFutureListener closeListener = this.closeListener;
901
902
903 this.closeListener = null;
904 try {
905 closeListener.operationComplete(future);
906 } catch (Exception e) {
907 throw new IllegalStateException("Close listener threw an unexpected exception", e);
908 }
909 }
910 }
911
912
913
914
915
916 private ChannelFuture goAway(ChannelHandlerContext ctx, Http2Exception cause, ChannelPromise promise) {
917 long errorCode = cause != null ? cause.error().code() : NO_ERROR.code();
918 int lastKnownStream;
919 if (cause != null && cause.shutdownHint() == Http2Exception.ShutdownHint.HARD_SHUTDOWN) {
920
921
922
923
924 lastKnownStream = Integer.MAX_VALUE;
925 } else {
926 lastKnownStream = connection().remote().lastStreamCreated();
927 }
928 return goAway(ctx, lastKnownStream, errorCode, Http2CodecUtil.toByteBuf(ctx, cause), promise);
929 }
930
931 private void processRstStreamWriteResult(ChannelHandlerContext ctx, Http2Stream stream, ChannelFuture future) {
932 if (future.isSuccess()) {
933 closeStream(stream, future);
934 } else {
935
936 onConnectionError(ctx, true, future.cause(), null);
937 }
938 }
939
940 private void closeConnectionOnError(ChannelHandlerContext ctx, ChannelFuture future) {
941 if (!future.isSuccess()) {
942 onConnectionError(ctx, true, future.cause(), null);
943 }
944 }
945
946 private void doCloseStream(final Http2Stream stream, ChannelFuture future) {
947 stream.close();
948 checkCloseConnection(future);
949 }
950
951
952
953
954 private static ByteBuf clientPrefaceString(Http2Connection connection) {
955 return connection.isServer() ? connectionPrefaceBuf() : null;
956 }
957
958 private static void processGoAwayWriteResult(final ChannelHandlerContext ctx, final int lastStreamId,
959 final long errorCode, final ByteBuf debugData, ChannelFuture future) {
960 try {
961 if (future.isSuccess()) {
962 if (errorCode != NO_ERROR.code()) {
963 if (logger.isDebugEnabled()) {
964 logger.debug("{} Sent GOAWAY: lastStreamId '{}', errorCode '{}', " +
965 "debugData '{}'. Forcing shutdown of the connection.",
966 ctx.channel(), lastStreamId, errorCode, debugData.toString(UTF_8), future.cause());
967 }
968 ctx.close();
969 }
970 } else {
971 if (logger.isDebugEnabled()) {
972 logger.debug("{} Sending GOAWAY failed: lastStreamId '{}', errorCode '{}', " +
973 "debugData '{}'. Forcing shutdown of the connection.",
974 ctx.channel(), lastStreamId, errorCode, debugData.toString(UTF_8), future.cause());
975 }
976 ctx.close();
977 }
978 } finally {
979
980 debugData.release();
981 }
982 }
983
984
985
986
987 private static final class ClosingChannelFutureListener implements ChannelFutureListener {
988 private final ChannelHandlerContext ctx;
989 private final ChannelPromise promise;
990 private final Future<?> timeoutTask;
991 private boolean closed;
992
993 ClosingChannelFutureListener(ChannelHandlerContext ctx, ChannelPromise promise) {
994 this.ctx = ctx;
995 this.promise = promise;
996 timeoutTask = null;
997 }
998
999 ClosingChannelFutureListener(final ChannelHandlerContext ctx, final ChannelPromise promise,
1000 long timeout, TimeUnit unit) {
1001 this.ctx = ctx;
1002 this.promise = promise;
1003 timeoutTask = ctx.executor().schedule(new Runnable() {
1004 @Override
1005 public void run() {
1006 doClose();
1007 }
1008 }, timeout, unit);
1009 }
1010
1011 @Override
1012 public void operationComplete(ChannelFuture sentGoAwayFuture) {
1013 if (timeoutTask != null) {
1014 timeoutTask.cancel(false);
1015 }
1016 doClose();
1017 }
1018
1019 private void doClose() {
1020
1021
1022 if (closed) {
1023
1024 assert timeoutTask != null;
1025 return;
1026 }
1027 closed = true;
1028 if (promise == null) {
1029 ctx.close();
1030 } else {
1031 ctx.close(promise);
1032 }
1033 }
1034 }
1035
1036 private final class PrefaceSendListener implements ChannelFutureListener {
1037 private final ChannelHandlerContext ctx;
1038 private final ChannelPromise promise;
1039
1040 PrefaceSendListener(ChannelHandlerContext ctx, ChannelPromise promise) {
1041 this.ctx = ctx;
1042 this.promise = promise;
1043 }
1044
1045 @Override
1046 public void operationComplete(ChannelFuture f) {
1047 if (f.isSuccess()) {
1048 try {
1049 if (byteDecoder != null) {
1050 byteDecoder.sendPrefaceIfNeeded(ctx);
1051 }
1052 } catch (Throwable e) {
1053 promise.setFailure(e);
1054 return;
1055 }
1056 promise.setSuccess();
1057 } else {
1058 promise.setFailure(f.cause());
1059 }
1060 }
1061 }
1062 }