1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package io.netty.handler.codec.http2;
16
17 import io.netty.buffer.ByteBuf;
18 import io.netty.buffer.ByteBufUtil;
19 import io.netty.buffer.Unpooled;
20 import io.netty.channel.ChannelFuture;
21 import io.netty.channel.ChannelFutureListener;
22 import io.netty.channel.ChannelHandlerContext;
23 import io.netty.channel.ChannelOutboundHandler;
24 import io.netty.channel.ChannelPromise;
25 import io.netty.handler.codec.ByteToMessageDecoder;
26 import io.netty.handler.codec.http.HttpResponseStatus;
27 import io.netty.handler.codec.http2.Http2Exception.CompositeStreamException;
28 import io.netty.handler.codec.http2.Http2Exception.StreamException;
29 import io.netty.util.CharsetUtil;
30 import io.netty.util.concurrent.Future;
31 import io.netty.util.internal.UnstableApi;
32 import io.netty.util.internal.logging.InternalLogger;
33 import io.netty.util.internal.logging.InternalLoggerFactory;
34
35 import java.net.SocketAddress;
36 import java.util.List;
37 import java.util.concurrent.TimeUnit;
38
39 import static io.netty.buffer.ByteBufUtil.hexDump;
40 import static io.netty.buffer.Unpooled.EMPTY_BUFFER;
41 import static io.netty.handler.codec.http2.Http2CodecUtil.HTTP_UPGRADE_STREAM_ID;
42 import static io.netty.handler.codec.http2.Http2CodecUtil.connectionPrefaceBuf;
43 import static io.netty.handler.codec.http2.Http2CodecUtil.getEmbeddedHttp2Exception;
44 import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
45 import static io.netty.handler.codec.http2.Http2Error.NO_ERROR;
46 import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
47 import static io.netty.handler.codec.http2.Http2Exception.connectionError;
48 import static io.netty.handler.codec.http2.Http2Exception.isStreamError;
49 import static io.netty.handler.codec.http2.Http2FrameTypes.SETTINGS;
50 import static io.netty.handler.codec.http2.Http2Stream.State.IDLE;
51 import static io.netty.util.CharsetUtil.UTF_8;
52 import static io.netty.util.internal.ObjectUtil.checkNotNull;
53 import static java.lang.Math.min;
54 import static java.util.concurrent.TimeUnit.MILLISECONDS;
55
56
57
58
59
60
61
62
63
64
65 @UnstableApi
66 public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http2LifecycleManager,
67 ChannelOutboundHandler {
68
69 private static final InternalLogger logger = InternalLoggerFactory.getInstance(Http2ConnectionHandler.class);
70
71 private static final Http2Headers HEADERS_TOO_LARGE_HEADERS = ReadOnlyHttp2Headers.serverHeaders(false,
72 HttpResponseStatus.REQUEST_HEADER_FIELDS_TOO_LARGE.codeAsText());
73 private static final ByteBuf HTTP_1_X_BUF = Unpooled.unreleasableBuffer(
74 Unpooled.wrappedBuffer(new byte[] {'H', 'T', 'T', 'P', '/', '1', '.'})).asReadOnly();
75
76 private final Http2ConnectionDecoder decoder;
77 private final Http2ConnectionEncoder encoder;
78 private final Http2Settings initialSettings;
79 private final boolean decoupleCloseAndGoAway;
80 private final boolean flushPreface;
81 private ChannelFutureListener closeListener;
82 private BaseDecoder byteDecoder;
83 private long gracefulShutdownTimeoutMillis;
84
85 protected Http2ConnectionHandler(Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder,
86 Http2Settings initialSettings) {
87 this(decoder, encoder, initialSettings, false);
88 }
89
90 protected Http2ConnectionHandler(Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder,
91 Http2Settings initialSettings, boolean decoupleCloseAndGoAway) {
92 this(decoder, encoder, initialSettings, decoupleCloseAndGoAway, true);
93 }
94
95 protected Http2ConnectionHandler(Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder,
96 Http2Settings initialSettings, boolean decoupleCloseAndGoAway,
97 boolean flushPreface) {
98 this.initialSettings = checkNotNull(initialSettings, "initialSettings");
99 this.decoder = checkNotNull(decoder, "decoder");
100 this.encoder = checkNotNull(encoder, "encoder");
101 this.decoupleCloseAndGoAway = decoupleCloseAndGoAway;
102 this.flushPreface = flushPreface;
103 if (encoder.connection() != decoder.connection()) {
104 throw new IllegalArgumentException("Encoder and Decoder do not share the same connection object");
105 }
106 }
107
108
109
110
111
112
113 public long gracefulShutdownTimeoutMillis() {
114 return gracefulShutdownTimeoutMillis;
115 }
116
117
118
119
120
121
122
123 public void gracefulShutdownTimeoutMillis(long gracefulShutdownTimeoutMillis) {
124 if (gracefulShutdownTimeoutMillis < -1) {
125 throw new IllegalArgumentException("gracefulShutdownTimeoutMillis: " + gracefulShutdownTimeoutMillis +
126 " (expected: -1 for indefinite or >= 0)");
127 }
128 this.gracefulShutdownTimeoutMillis = gracefulShutdownTimeoutMillis;
129 }
130
131 public Http2Connection connection() {
132 return encoder.connection();
133 }
134
135 public Http2ConnectionDecoder decoder() {
136 return decoder;
137 }
138
139 public Http2ConnectionEncoder encoder() {
140 return encoder;
141 }
142
143 private boolean prefaceSent() {
144 return byteDecoder != null && byteDecoder.prefaceSent();
145 }
146
147
148
149
150
151 public void onHttpClientUpgrade() throws Http2Exception {
152 if (connection().isServer()) {
153 throw connectionError(PROTOCOL_ERROR, "Client-side HTTP upgrade requested for a server");
154 }
155 if (!prefaceSent()) {
156
157
158 throw connectionError(INTERNAL_ERROR, "HTTP upgrade must occur after preface was sent");
159 }
160 if (decoder.prefaceReceived()) {
161 throw connectionError(PROTOCOL_ERROR, "HTTP upgrade must occur before HTTP/2 preface is received");
162 }
163
164
165 connection().local().createStream(HTTP_UPGRADE_STREAM_ID, true);
166 }
167
168
169
170
171
172 public void onHttpServerUpgrade(Http2Settings settings) throws Http2Exception {
173 if (!connection().isServer()) {
174 throw connectionError(PROTOCOL_ERROR, "Server-side HTTP upgrade requested for a client");
175 }
176 if (!prefaceSent()) {
177
178
179 throw connectionError(INTERNAL_ERROR, "HTTP upgrade must occur after preface was sent");
180 }
181 if (decoder.prefaceReceived()) {
182 throw connectionError(PROTOCOL_ERROR, "HTTP upgrade must occur before HTTP/2 preface is received");
183 }
184
185
186 encoder.remoteSettings(settings);
187
188
189 connection().remote().createStream(HTTP_UPGRADE_STREAM_ID, true);
190 }
191
192 @Override
193 public void flush(ChannelHandlerContext ctx) {
194 try {
195
196 encoder.flowController().writePendingBytes();
197 ctx.flush();
198 } catch (Http2Exception e) {
199 onError(ctx, true, e);
200 } catch (Throwable cause) {
201 onError(ctx, true, connectionError(INTERNAL_ERROR, cause, "Error flushing"));
202 }
203 }
204
205 private abstract class BaseDecoder {
206 public abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;
207 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { }
208 public void channelActive(ChannelHandlerContext ctx) throws Exception { }
209
210 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
211
212 encoder().close();
213 decoder().close();
214
215
216
217 connection().close(ctx.voidPromise());
218 }
219
220
221
222
223 public boolean prefaceSent() {
224 return true;
225 }
226 }
227
228 private final class PrefaceDecoder extends BaseDecoder {
229 private ByteBuf clientPrefaceString;
230 private boolean prefaceSent;
231
232 PrefaceDecoder(ChannelHandlerContext ctx) throws Exception {
233 clientPrefaceString = clientPrefaceString(encoder.connection());
234
235
236 sendPreface(ctx);
237 }
238
239 @Override
240 public boolean prefaceSent() {
241 return prefaceSent;
242 }
243
244 @Override
245 public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
246 try {
247 if (ctx.channel().isActive() && readClientPrefaceString(in) && verifyFirstFrameIsSettings(in)) {
248
249 byteDecoder = new FrameDecoder();
250 byteDecoder.decode(ctx, in, out);
251 }
252 } catch (Throwable e) {
253 onError(ctx, false, e);
254 }
255 }
256
257 @Override
258 public void channelActive(ChannelHandlerContext ctx) throws Exception {
259
260 sendPreface(ctx);
261
262 if (flushPreface) {
263
264
265
266 ctx.flush();
267 }
268 }
269
270 @Override
271 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
272 cleanup();
273 super.channelInactive(ctx);
274 }
275
276
277
278
279 @Override
280 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
281 cleanup();
282 }
283
284
285
286
287 private void cleanup() {
288 if (clientPrefaceString != null) {
289 clientPrefaceString.release();
290 clientPrefaceString = null;
291 }
292 }
293
294
295
296
297
298
299
300 private boolean readClientPrefaceString(ByteBuf in) throws Http2Exception {
301 if (clientPrefaceString == null) {
302 return true;
303 }
304
305 int prefaceRemaining = clientPrefaceString.readableBytes();
306 int bytesRead = min(in.readableBytes(), prefaceRemaining);
307
308
309 if (bytesRead == 0 || !ByteBufUtil.equals(in, in.readerIndex(),
310 clientPrefaceString, clientPrefaceString.readerIndex(),
311 bytesRead)) {
312 int maxSearch = 1024;
313 int http1Index =
314 ByteBufUtil.indexOf(HTTP_1_X_BUF, in.slice(in.readerIndex(), min(in.readableBytes(), maxSearch)));
315 if (http1Index != -1) {
316 String chunk = in.toString(in.readerIndex(), http1Index - in.readerIndex(), CharsetUtil.US_ASCII);
317 throw connectionError(PROTOCOL_ERROR, "Unexpected HTTP/1.x request: %s", chunk);
318 }
319 String receivedBytes = hexDump(in, in.readerIndex(),
320 min(in.readableBytes(), clientPrefaceString.readableBytes()));
321 throw connectionError(PROTOCOL_ERROR, "HTTP/2 client preface string missing or corrupt. " +
322 "Hex dump for received bytes: %s", receivedBytes);
323 }
324 in.skipBytes(bytesRead);
325 clientPrefaceString.skipBytes(bytesRead);
326
327 if (!clientPrefaceString.isReadable()) {
328
329 clientPrefaceString.release();
330 clientPrefaceString = null;
331 return true;
332 }
333 return false;
334 }
335
336
337
338
339
340
341
342
343
344 private boolean verifyFirstFrameIsSettings(ByteBuf in) throws Http2Exception {
345 if (in.readableBytes() < 5) {
346
347 return false;
348 }
349
350 short frameType = in.getUnsignedByte(in.readerIndex() + 3);
351 short flags = in.getUnsignedByte(in.readerIndex() + 4);
352 if (frameType != SETTINGS || (flags & Http2Flags.ACK) != 0) {
353 throw connectionError(PROTOCOL_ERROR, "First received frame was not SETTINGS. " +
354 "Hex dump for first 5 bytes: %s",
355 hexDump(in, in.readerIndex(), 5));
356 }
357 return true;
358 }
359
360
361
362
363 private void sendPreface(ChannelHandlerContext ctx) throws Exception {
364 if (prefaceSent || !ctx.channel().isActive()) {
365 return;
366 }
367
368 prefaceSent = true;
369
370 final boolean isClient = !connection().isServer();
371 if (isClient) {
372
373 ctx.write(connectionPrefaceBuf()).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
374 }
375
376
377 encoder.writeSettings(ctx, initialSettings, ctx.newPromise()).addListener(
378 ChannelFutureListener.CLOSE_ON_FAILURE);
379
380 if (isClient) {
381
382
383
384 userEventTriggered(ctx, Http2ConnectionPrefaceAndSettingsFrameWrittenEvent.INSTANCE);
385 }
386 }
387 }
388
389 private final class FrameDecoder extends BaseDecoder {
390 @Override
391 public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
392 try {
393 decoder.decodeFrame(ctx, in, out);
394 } catch (Throwable e) {
395 onError(ctx, false, e);
396 }
397 }
398 }
399
400 @Override
401 public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
402
403 encoder.lifecycleManager(this);
404 decoder.lifecycleManager(this);
405 encoder.flowController().channelHandlerContext(ctx);
406 decoder.flowController().channelHandlerContext(ctx);
407 byteDecoder = new PrefaceDecoder(ctx);
408 }
409
410 @Override
411 protected void handlerRemoved0(ChannelHandlerContext ctx) throws Exception {
412 if (byteDecoder != null) {
413 byteDecoder.handlerRemoved(ctx);
414 byteDecoder = null;
415 }
416 }
417
418 @Override
419 public void channelActive(ChannelHandlerContext ctx) throws Exception {
420 if (byteDecoder == null) {
421 byteDecoder = new PrefaceDecoder(ctx);
422 }
423 byteDecoder.channelActive(ctx);
424 super.channelActive(ctx);
425 }
426
427 @Override
428 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
429
430 super.channelInactive(ctx);
431 if (byteDecoder != null) {
432 byteDecoder.channelInactive(ctx);
433 byteDecoder = null;
434 }
435 }
436
437 @Override
438 public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
439
440
441 try {
442 if (ctx.channel().isWritable()) {
443 flush(ctx);
444 }
445 encoder.flowController().channelWritabilityChanged();
446 } finally {
447 super.channelWritabilityChanged(ctx);
448 }
449 }
450
451 @Override
452 protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
453 byteDecoder.decode(ctx, in, out);
454 }
455
456 @Override
457 public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
458 ctx.bind(localAddress, promise);
459 }
460
461 @Override
462 public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress,
463 ChannelPromise promise) throws Exception {
464 ctx.connect(remoteAddress, localAddress, promise);
465 }
466
467 @Override
468 public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
469 ctx.disconnect(promise);
470 }
471
472 @Override
473 public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
474 if (decoupleCloseAndGoAway) {
475 ctx.close(promise);
476 return;
477 }
478 promise = promise.unvoid();
479
480 if (!ctx.channel().isActive() || !prefaceSent()) {
481 ctx.close(promise);
482 return;
483 }
484
485
486
487
488
489
490 ChannelFuture f = connection().goAwaySent() ? ctx.write(EMPTY_BUFFER) : goAway(ctx, null, ctx.newPromise());
491 ctx.flush();
492 doGracefulShutdown(ctx, f, promise);
493 }
494
495 private ChannelFutureListener newClosingChannelFutureListener(
496 ChannelHandlerContext ctx, ChannelPromise promise) {
497 long gracefulShutdownTimeoutMillis = this.gracefulShutdownTimeoutMillis;
498 return gracefulShutdownTimeoutMillis < 0 ?
499 new ClosingChannelFutureListener(ctx, promise) :
500 new ClosingChannelFutureListener(ctx, promise, gracefulShutdownTimeoutMillis, MILLISECONDS);
501 }
502
503 private void doGracefulShutdown(ChannelHandlerContext ctx, ChannelFuture future, final ChannelPromise promise) {
504 final ChannelFutureListener listener = newClosingChannelFutureListener(ctx, promise);
505 if (isGracefulShutdownComplete()) {
506
507
508 future.addListener(listener);
509 } else {
510
511
512
513
514 if (closeListener == null) {
515 closeListener = listener;
516 } else if (promise != null) {
517 final ChannelFutureListener oldCloseListener = closeListener;
518 closeListener = new ChannelFutureListener() {
519 @Override
520 public void operationComplete(ChannelFuture future) throws Exception {
521 try {
522 oldCloseListener.operationComplete(future);
523 } finally {
524 listener.operationComplete(future);
525 }
526 }
527 };
528 }
529 }
530 }
531
532 @Override
533 public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
534 ctx.deregister(promise);
535 }
536
537 @Override
538 public void read(ChannelHandlerContext ctx) throws Exception {
539 ctx.read();
540 }
541
542 @Override
543 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
544 ctx.write(msg, promise);
545 }
546
547 @Override
548 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
549
550
551 try {
552
553 channelReadComplete0(ctx);
554 } finally {
555 flush(ctx);
556 }
557 }
558
559 final void channelReadComplete0(ChannelHandlerContext ctx) {
560
561 discardSomeReadBytes();
562
563
564
565
566 if (!ctx.channel().config().isAutoRead()) {
567 ctx.read();
568 }
569
570 ctx.fireChannelReadComplete();
571 }
572
573
574
575
576 @Override
577 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
578 if (getEmbeddedHttp2Exception(cause) != null) {
579
580 onError(ctx, false, cause);
581 } else {
582 super.exceptionCaught(ctx, cause);
583 }
584 }
585
586
587
588
589
590
591
592
593 @Override
594 public void closeStreamLocal(Http2Stream stream, ChannelFuture future) {
595 switch (stream.state()) {
596 case HALF_CLOSED_LOCAL:
597 case OPEN:
598 stream.closeLocalSide();
599 break;
600 default:
601 closeStream(stream, future);
602 break;
603 }
604 }
605
606
607
608
609
610
611
612
613 @Override
614 public void closeStreamRemote(Http2Stream stream, ChannelFuture future) {
615 switch (stream.state()) {
616 case HALF_CLOSED_REMOTE:
617 case OPEN:
618 stream.closeRemoteSide();
619 break;
620 default:
621 closeStream(stream, future);
622 break;
623 }
624 }
625
626 @Override
627 public void closeStream(final Http2Stream stream, ChannelFuture future) {
628 stream.close();
629
630 if (future.isDone()) {
631 checkCloseConnection(future);
632 } else {
633 future.addListener(new ChannelFutureListener() {
634 @Override
635 public void operationComplete(ChannelFuture future) throws Exception {
636 checkCloseConnection(future);
637 }
638 });
639 }
640 }
641
642
643
644
645 @Override
646 public void onError(ChannelHandlerContext ctx, boolean outbound, Throwable cause) {
647 Http2Exception embedded = getEmbeddedHttp2Exception(cause);
648 if (isStreamError(embedded)) {
649 onStreamError(ctx, outbound, cause, (StreamException) embedded);
650 } else if (embedded instanceof CompositeStreamException) {
651 CompositeStreamException compositException = (CompositeStreamException) embedded;
652 for (StreamException streamException : compositException) {
653 onStreamError(ctx, outbound, cause, streamException);
654 }
655 } else {
656 onConnectionError(ctx, outbound, cause, embedded);
657 }
658 ctx.flush();
659 }
660
661
662
663
664
665
666 protected boolean isGracefulShutdownComplete() {
667 return connection().numActiveStreams() == 0;
668 }
669
670
671
672
673
674
675
676
677
678
679
680 protected void onConnectionError(ChannelHandlerContext ctx, boolean outbound,
681 Throwable cause, Http2Exception http2Ex) {
682 if (http2Ex == null) {
683 http2Ex = new Http2Exception(INTERNAL_ERROR, cause.getMessage(), cause);
684 }
685
686 ChannelPromise promise = ctx.newPromise();
687 ChannelFuture future = goAway(ctx, http2Ex, ctx.newPromise());
688 if (http2Ex.shutdownHint() == Http2Exception.ShutdownHint.GRACEFUL_SHUTDOWN) {
689 doGracefulShutdown(ctx, future, promise);
690 } else {
691 future.addListener(newClosingChannelFutureListener(ctx, promise));
692 }
693 }
694
695
696
697
698
699
700
701
702
703
704 protected void onStreamError(ChannelHandlerContext ctx, boolean outbound,
705 @SuppressWarnings("unused") Throwable cause, StreamException http2Ex) {
706 final int streamId = http2Ex.streamId();
707 Http2Stream stream = connection().stream(streamId);
708
709
710 if (http2Ex instanceof Http2Exception.HeaderListSizeException &&
711 ((Http2Exception.HeaderListSizeException) http2Ex).duringDecode() &&
712 connection().isServer()) {
713
714
715
716
717
718
719
720 if (stream == null) {
721 try {
722 stream = encoder.connection().remote().createStream(streamId, true);
723 } catch (Http2Exception e) {
724 resetUnknownStream(ctx, streamId, http2Ex.error().code(), ctx.newPromise());
725 return;
726 }
727 }
728
729
730 if (stream != null && !stream.isHeadersSent()) {
731 try {
732 handleServerHeaderDecodeSizeError(ctx, stream);
733 } catch (Throwable cause2) {
734 onError(ctx, outbound, connectionError(INTERNAL_ERROR, cause2, "Error DecodeSizeError"));
735 }
736 }
737 }
738
739 if (stream == null) {
740 if (!outbound || connection().local().mayHaveCreatedStream(streamId)) {
741 resetUnknownStream(ctx, streamId, http2Ex.error().code(), ctx.newPromise());
742 }
743 } else {
744 resetStream(ctx, stream, http2Ex.error().code(), ctx.newPromise());
745 }
746 }
747
748
749
750
751
752
753
754
755 protected void handleServerHeaderDecodeSizeError(ChannelHandlerContext ctx, Http2Stream stream) {
756 encoder().writeHeaders(ctx, stream.id(), HEADERS_TOO_LARGE_HEADERS, 0, true, ctx.newPromise());
757 }
758
759 protected Http2FrameWriter frameWriter() {
760 return encoder().frameWriter();
761 }
762
763
764
765
766
767
768 private ChannelFuture resetUnknownStream(final ChannelHandlerContext ctx, int streamId, long errorCode,
769 ChannelPromise promise) {
770 ChannelFuture future = frameWriter().writeRstStream(ctx, streamId, errorCode, promise);
771 if (future.isDone()) {
772 closeConnectionOnError(ctx, future);
773 } else {
774 future.addListener(new ChannelFutureListener() {
775 @Override
776 public void operationComplete(ChannelFuture future) throws Exception {
777 closeConnectionOnError(ctx, future);
778 }
779 });
780 }
781 return future;
782 }
783
784 @Override
785 public ChannelFuture resetStream(final ChannelHandlerContext ctx, int streamId, long errorCode,
786 ChannelPromise promise) {
787 final Http2Stream stream = connection().stream(streamId);
788 if (stream == null) {
789 return resetUnknownStream(ctx, streamId, errorCode, promise.unvoid());
790 }
791
792 return resetStream(ctx, stream, errorCode, promise);
793 }
794
795 private ChannelFuture resetStream(final ChannelHandlerContext ctx, final Http2Stream stream,
796 long errorCode, ChannelPromise promise) {
797 promise = promise.unvoid();
798 if (stream.isResetSent()) {
799
800 return promise.setSuccess();
801 }
802
803
804
805
806
807 stream.resetSent();
808
809 final ChannelFuture future;
810
811
812 if (stream.state() == IDLE ||
813 connection().local().created(stream) && !stream.isHeadersSent() && !stream.isPushPromiseSent()) {
814 future = promise.setSuccess();
815 } else {
816 future = frameWriter().writeRstStream(ctx, stream.id(), errorCode, promise);
817 }
818 if (future.isDone()) {
819 processRstStreamWriteResult(ctx, stream, future);
820 } else {
821 future.addListener(new ChannelFutureListener() {
822 @Override
823 public void operationComplete(ChannelFuture future) throws Exception {
824 processRstStreamWriteResult(ctx, stream, future);
825 }
826 });
827 }
828
829 return future;
830 }
831
832 @Override
833 public ChannelFuture goAway(final ChannelHandlerContext ctx, final int lastStreamId, final long errorCode,
834 final ByteBuf debugData, ChannelPromise promise) {
835 promise = promise.unvoid();
836 final Http2Connection connection = connection();
837 try {
838 if (!connection.goAwaySent(lastStreamId, errorCode, debugData)) {
839 debugData.release();
840 promise.trySuccess();
841 return promise;
842 }
843 } catch (Throwable cause) {
844 debugData.release();
845 promise.tryFailure(cause);
846 return promise;
847 }
848
849
850
851 debugData.retain();
852 ChannelFuture future = frameWriter().writeGoAway(ctx, lastStreamId, errorCode, debugData, promise);
853
854 if (future.isDone()) {
855 processGoAwayWriteResult(ctx, lastStreamId, errorCode, debugData, future);
856 } else {
857 future.addListener(new ChannelFutureListener() {
858 @Override
859 public void operationComplete(ChannelFuture future) throws Exception {
860 processGoAwayWriteResult(ctx, lastStreamId, errorCode, debugData, future);
861 }
862 });
863 }
864
865 return future;
866 }
867
868
869
870
871
872 private void checkCloseConnection(ChannelFuture future) {
873
874
875 if (closeListener != null && isGracefulShutdownComplete()) {
876 ChannelFutureListener closeListener = this.closeListener;
877
878
879 this.closeListener = null;
880 try {
881 closeListener.operationComplete(future);
882 } catch (Exception e) {
883 throw new IllegalStateException("Close listener threw an unexpected exception", e);
884 }
885 }
886 }
887
888
889
890
891
892 private ChannelFuture goAway(ChannelHandlerContext ctx, Http2Exception cause, ChannelPromise promise) {
893 long errorCode = cause != null ? cause.error().code() : NO_ERROR.code();
894 int lastKnownStream;
895 if (cause != null && cause.shutdownHint() == Http2Exception.ShutdownHint.HARD_SHUTDOWN) {
896
897
898
899
900 lastKnownStream = Integer.MAX_VALUE;
901 } else {
902 lastKnownStream = connection().remote().lastStreamCreated();
903 }
904 return goAway(ctx, lastKnownStream, errorCode, Http2CodecUtil.toByteBuf(ctx, cause), promise);
905 }
906
907 private void processRstStreamWriteResult(ChannelHandlerContext ctx, Http2Stream stream, ChannelFuture future) {
908 if (future.isSuccess()) {
909 closeStream(stream, future);
910 } else {
911
912 onConnectionError(ctx, true, future.cause(), null);
913 }
914 }
915
916 private void closeConnectionOnError(ChannelHandlerContext ctx, ChannelFuture future) {
917 if (!future.isSuccess()) {
918 onConnectionError(ctx, true, future.cause(), null);
919 }
920 }
921
922
923
924
925 private static ByteBuf clientPrefaceString(Http2Connection connection) {
926 return connection.isServer() ? connectionPrefaceBuf() : null;
927 }
928
929 private static void processGoAwayWriteResult(final ChannelHandlerContext ctx, final int lastStreamId,
930 final long errorCode, final ByteBuf debugData, ChannelFuture future) {
931 try {
932 if (future.isSuccess()) {
933 if (errorCode != NO_ERROR.code()) {
934 if (logger.isDebugEnabled()) {
935 logger.debug("{} Sent GOAWAY: lastStreamId '{}', errorCode '{}', " +
936 "debugData '{}'. Forcing shutdown of the connection.",
937 ctx.channel(), lastStreamId, errorCode, debugData.toString(UTF_8), future.cause());
938 }
939 ctx.close();
940 }
941 } else {
942 if (logger.isDebugEnabled()) {
943 logger.debug("{} Sending GOAWAY failed: lastStreamId '{}', errorCode '{}', " +
944 "debugData '{}'. Forcing shutdown of the connection.",
945 ctx.channel(), lastStreamId, errorCode, debugData.toString(UTF_8), future.cause());
946 }
947 ctx.close();
948 }
949 } finally {
950
951 debugData.release();
952 }
953 }
954
955
956
957
958 private static final class ClosingChannelFutureListener implements ChannelFutureListener {
959 private final ChannelHandlerContext ctx;
960 private final ChannelPromise promise;
961 private final Future<?> timeoutTask;
962 private boolean closed;
963
964 ClosingChannelFutureListener(ChannelHandlerContext ctx, ChannelPromise promise) {
965 this.ctx = ctx;
966 this.promise = promise;
967 timeoutTask = null;
968 }
969
970 ClosingChannelFutureListener(final ChannelHandlerContext ctx, final ChannelPromise promise,
971 long timeout, TimeUnit unit) {
972 this.ctx = ctx;
973 this.promise = promise;
974 timeoutTask = ctx.executor().schedule(new Runnable() {
975 @Override
976 public void run() {
977 doClose();
978 }
979 }, timeout, unit);
980 }
981
982 @Override
983 public void operationComplete(ChannelFuture sentGoAwayFuture) {
984 if (timeoutTask != null) {
985 timeoutTask.cancel(false);
986 }
987 doClose();
988 }
989
990 private void doClose() {
991
992
993 if (closed) {
994
995 assert timeoutTask != null;
996 return;
997 }
998 closed = true;
999 if (promise == null) {
1000 ctx.close();
1001 } else {
1002 ctx.close(promise);
1003 }
1004 }
1005 }
1006 }